/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.ActiveTaskCreator;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StandbyTaskCreator;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.StreamsRebalanceListener;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

public class StreamThread
extends Thread {
    private final Time time;
    private final Logger log;
    private final String logPrefix;
    private final Object stateLock;
    private final Duration pollTime;
    private final long commitTimeMs;
    private final int maxPollTimeMs;
    private final String originalReset;
    private final TaskManager taskManager;
    private final AtomicLong nextProbingRebalanceMs;
    private final StreamsMetricsImpl streamsMetrics;
    private final Sensor commitSensor;
    private final Sensor pollSensor;
    private final Sensor pollRecordsSensor;
    private final Sensor punctuateSensor;
    private final Sensor processRecordsSensor;
    private final Sensor processLatencySensor;
    private final Sensor processRateSensor;
    private final Sensor pollRatioSensor;
    private final Sensor processRatioSensor;
    private final Sensor punctuateRatioSensor;
    private final Sensor commitRatioSensor;
    private long now;
    private long lastPollMs;
    private long lastCommitMs;
    private int numIterations;
    private volatile State state = State.CREATED;
    private volatile ThreadMetadata threadMetadata;
    private StateListener stateListener;
    private final ChangelogReader changelogReader;
    private final ConsumerRebalanceListener rebalanceListener;
    private final Consumer<byte[], byte[]> mainConsumer;
    private final Consumer<byte[], byte[]> restoreConsumer;
    private final Admin adminClient;
    private final InternalTopologyBuilder builder;

    public void setStateListener(StateListener listener) {
        this.stateListener = listener;
    }

    public State state() {
        return this.state;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    State setState(State newState) {
        State oldState;
        Object object = this.stateLock;
        synchronized (object) {
            oldState = this.state;
            if (this.state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
                this.log.debug("Ignoring request to transit from PENDING_SHUTDOWN to {}: only DEAD state is a valid next state", (Object)newState);
                return null;
            }
            if (this.state == State.DEAD) {
                this.log.debug("Ignoring request to transit from DEAD to {}: no valid next state after DEAD", (Object)newState);
                return null;
            }
            if (!this.state.isValidTransition(newState)) {
                this.log.error("Unexpected state transition from {} to {}", (Object)oldState, (Object)newState);
                throw new StreamsException(this.logPrefix + "Unexpected state transition from " + oldState + " to " + newState);
            }
            this.log.info("State transition from {} to {}", (Object)oldState, (Object)newState);
            this.state = newState;
            if (newState == State.RUNNING) {
                this.updateThreadMetadata(this.taskManager.activeTaskMap(), this.taskManager.standbyTaskMap());
            } else {
                this.updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap());
            }
        }
        if (this.stateListener != null) {
            this.stateListener.onChange(this, this.state, oldState);
        }
        return oldState;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isRunning() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.isAlive();
        }
    }

    public static StreamThread create(InternalTopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, Admin adminClient, UUID processId, String clientId, StreamsMetricsImpl streamsMetrics, Time time, StreamsMetadataState streamsMetadataState, long cacheSizeBytes, StateDirectory stateDirectory, StateRestoreListener userStateRestoreListener, int threadIdx) {
        String threadId = clientId + "-StreamThread-" + threadIdx;
        String logPrefix = String.format("stream-thread [%s] ", threadId);
        LogContext logContext = new LogContext(logPrefix);
        Logger log = logContext.logger(StreamThread.class);
        log.info("Creating restore consumer client");
        Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(ClientUtils.getRestoreConsumerClientId(threadId));
        Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
        StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, adminClient, restoreConsumer, userStateRestoreListener);
        ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
        ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(builder, config, streamsMetrics, stateDirectory, changelogReader, cache, time, clientSupplier, threadId, processId, log);
        StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(builder, config, streamsMetrics, stateDirectory, changelogReader, threadId, log);
        TaskManager taskManager = new TaskManager(changelogReader, processId, logPrefix, activeTaskCreator, standbyTaskCreator, builder, adminClient, stateDirectory, StreamThread.processingMode(config));
        log.info("Creating consumer client");
        String applicationId = config.getString("application.id");
        Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, ClientUtils.getConsumerClientId(threadId), threadIdx);
        consumerConfigs.put("__time__", time);
        consumerConfigs.put("__task.manager.instance__", taskManager);
        consumerConfigs.put("__streams.metadata.state.instance__", streamsMetadataState);
        consumerConfigs.put("__streams.admin.client.instance__", adminClient);
        AtomicInteger assignmentErrorCode = new AtomicInteger();
        consumerConfigs.put("__assignment.error.code__", assignmentErrorCode);
        AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
        consumerConfigs.put("__next.probing.rebalance.ms__", nextScheduledRebalanceMs);
        String originalReset = (String)consumerConfigs.get("auto.offset.reset");
        if (!builder.latestResetTopicsPattern().pattern().isEmpty() || !builder.earliestResetTopicsPattern().pattern().isEmpty()) {
            consumerConfigs.put("auto.offset.reset", "none");
        }
        Consumer<byte[], byte[]> mainConsumer = clientSupplier.getConsumer(consumerConfigs);
        changelogReader.setMainConsumer(mainConsumer);
        taskManager.setMainConsumer(mainConsumer);
        StreamThread streamThread = new StreamThread(time, config, adminClient, mainConsumer, restoreConsumer, changelogReader, originalReset, taskManager, streamsMetrics, builder, threadId, logContext, assignmentErrorCode, nextScheduledRebalanceMs);
        taskManager.setPartitionResetter(partitions -> streamThread.resetOffsets((Set<TopicPartition>)partitions, null));
        return streamThread.updateThreadMetadata(ClientUtils.getSharedAdminClientId(clientId));
    }

    public static ProcessingMode processingMode(StreamsConfig config) {
        if ("exactly_once".equals(config.getString("processing.guarantee"))) {
            return ProcessingMode.EXACTLY_ONCE_ALPHA;
        }
        if ("exactly_once_beta".equals(config.getString("processing.guarantee"))) {
            return ProcessingMode.EXACTLY_ONCE_BETA;
        }
        return ProcessingMode.AT_LEAST_ONCE;
    }

    public static boolean eosEnabled(StreamsConfig config) {
        ProcessingMode processingMode = StreamThread.processingMode(config);
        return processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || processingMode == ProcessingMode.EXACTLY_ONCE_BETA;
    }

    public StreamThread(Time time, StreamsConfig config, Admin adminClient, Consumer<byte[], byte[]> mainConsumer, Consumer<byte[], byte[]> restoreConsumer, ChangelogReader changelogReader, String originalReset, TaskManager taskManager, StreamsMetricsImpl streamsMetrics, InternalTopologyBuilder builder, String threadId, LogContext logContext, AtomicInteger assignmentErrorCode, AtomicLong nextProbingRebalanceMs) {
        super(threadId);
        this.stateLock = new Object();
        this.adminClient = adminClient;
        this.streamsMetrics = streamsMetrics;
        this.commitSensor = ThreadMetrics.commitSensor(threadId, streamsMetrics);
        this.pollSensor = ThreadMetrics.pollSensor(threadId, streamsMetrics);
        this.pollRecordsSensor = ThreadMetrics.pollRecordsSensor(threadId, streamsMetrics);
        this.pollRatioSensor = ThreadMetrics.pollRatioSensor(threadId, streamsMetrics);
        this.processLatencySensor = ThreadMetrics.processLatencySensor(threadId, streamsMetrics);
        this.processRecordsSensor = ThreadMetrics.processRecordsSensor(threadId, streamsMetrics);
        this.processRateSensor = ThreadMetrics.processRateSensor(threadId, streamsMetrics);
        this.processRatioSensor = ThreadMetrics.processRatioSensor(threadId, streamsMetrics);
        this.punctuateSensor = ThreadMetrics.punctuateSensor(threadId, streamsMetrics);
        this.punctuateRatioSensor = ThreadMetrics.punctuateRatioSensor(threadId, streamsMetrics);
        this.commitRatioSensor = ThreadMetrics.commitRatioSensor(threadId, streamsMetrics);
        ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
        ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
        if (streamsMetrics.version() == StreamsMetricsImpl.Version.FROM_0100_TO_24) {
            ThreadMetrics.skipRecordSensor(threadId, streamsMetrics);
            ThreadMetrics.commitOverTasksSensor(threadId, streamsMetrics);
        }
        this.time = time;
        this.builder = builder;
        this.logPrefix = logContext.logPrefix();
        this.log = logContext.logger(StreamThread.class);
        this.rebalanceListener = new StreamsRebalanceListener(time, taskManager, this, this.log, assignmentErrorCode);
        this.taskManager = taskManager;
        this.restoreConsumer = restoreConsumer;
        this.mainConsumer = mainConsumer;
        this.changelogReader = changelogReader;
        this.originalReset = originalReset;
        this.nextProbingRebalanceMs = nextProbingRebalanceMs;
        this.pollTime = Duration.ofMillis(config.getLong("poll.ms"));
        boolean dummyThreadIdx = true;
        this.maxPollTimeMs = new InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", "dummyClientId", 1)).getInt("max.poll.interval.ms");
        this.commitTimeMs = config.getLong("commit.interval.ms");
        this.numIterations = 1;
    }

    @Override
    public void run() {
        this.log.info("Starting");
        if (this.setState(State.STARTING) == null) {
            this.log.info("StreamThread already shutdown. Not running");
            return;
        }
        boolean cleanRun = false;
        try {
            this.runLoop();
            cleanRun = true;
        }
        catch (Exception e) {
            String errorMessage;
            if (e instanceof UnsupportedVersionException && (errorMessage = e.getMessage()) != null && errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) {
                this.log.error("Shutting down because the Kafka cluster seems to be on a too old version. Setting {}=\"{}\" requires broker version 2.5 or higher.", (Object)"processing.guarantee", (Object)"exactly_once_beta");
                throw e;
            }
            this.log.error("Encountered the following exception during processing and the thread is going to shut down: ", (Throwable)e);
            throw e;
        }
        finally {
            this.completeShutdown(cleanRun);
        }
    }

    void runLoop() {
        this.subscribeConsumer();
        while (this.isRunning() || this.taskManager.isRebalanceInProgress()) {
            try {
                this.runOnce();
                if (this.nextProbingRebalanceMs.get() >= this.time.milliseconds()) continue;
                this.log.info("Triggering the followup rebalance scheduled for {} ms.", (Object)this.nextProbingRebalanceMs.get());
                this.mainConsumer.enforceRebalance();
                this.nextProbingRebalanceMs.set(Long.MAX_VALUE);
            }
            catch (TaskCorruptedException e) {
                this.log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. Will close the task as dirty and re-create and bootstrap from scratch.", (Throwable)((Object)e));
                try {
                    this.taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
                }
                catch (TaskMigratedException taskMigrated) {
                    this.handleTaskMigrated(taskMigrated);
                }
            }
            catch (TaskMigratedException e) {
                this.handleTaskMigrated(e);
            }
        }
    }

    private void handleTaskMigrated(TaskMigratedException e) {
        this.log.warn("Detected that the thread is being fenced. This implies that this thread missed a rebalance and dropped out of the consumer group. Will close out all assigned tasks and rejoin the consumer group.", (Throwable)((Object)e));
        this.taskManager.handleLostAll();
        this.mainConsumer.unsubscribe();
        this.subscribeConsumer();
    }

    private void subscribeConsumer() {
        if (this.builder.usesPatternSubscription()) {
            this.mainConsumer.subscribe(this.builder.sourceTopicPattern(), this.rebalanceListener);
        } else {
            this.mainConsumer.subscribe(this.builder.sourceTopicCollection(), this.rebalanceListener);
        }
    }

    void runOnce() {
        ConsumerRecords<byte[], byte[]> records;
        long startMs;
        this.now = startMs = this.time.milliseconds();
        if (this.state == State.PARTITIONS_ASSIGNED) {
            records = this.pollRequests(Duration.ZERO);
        } else if (this.state == State.PARTITIONS_REVOKED) {
            records = this.pollRequests(Duration.ZERO);
        } else if (this.state == State.RUNNING || this.state == State.STARTING) {
            records = this.pollRequests(this.pollTime);
        } else if (this.state == State.PENDING_SHUTDOWN) {
            records = this.pollRequests(Duration.ZERO);
        } else {
            this.log.error("Unexpected state {} during normal iteration", (Object)this.state);
            throw new StreamsException(this.logPrefix + "Unexpected state " + this.state + " during normal iteration");
        }
        long pollLatency = this.advanceNowAndComputeLatency();
        if (records != null && !records.isEmpty()) {
            this.pollSensor.record((double)pollLatency, this.now);
            this.pollRecordsSensor.record((double)records.count(), this.now);
            this.taskManager.addRecordsToTasks(records);
        }
        if (!this.isRunning()) {
            this.log.debug("State already transits to {}, skipping the run once call after poll request", (Object)this.state);
            return;
        }
        if (this.state == State.PARTITIONS_ASSIGNED || this.state == State.RUNNING && this.taskManager.needsInitializationOrRestoration()) {
            this.changelogReader.enforceRestoreActive();
            if (this.taskManager.tryToCompleteRestoration()) {
                this.changelogReader.transitToUpdateStandby();
                this.setState(State.RUNNING);
            }
        }
        this.changelogReader.restore();
        this.advanceNowAndComputeLatency();
        int totalProcessed = 0;
        long totalCommitLatency = 0L;
        long totalProcessLatency = 0L;
        long totalPunctuateLatency = 0L;
        if (this.state == State.RUNNING) {
            while (true) {
                int processed = this.taskManager.process(this.numIterations, this.time);
                long processLatency = this.advanceNowAndComputeLatency();
                totalProcessLatency += processLatency;
                if (processed > 0) {
                    this.processRateSensor.record((double)processed, this.now);
                    this.processLatencySensor.record((double)processLatency / (double)processed, this.now);
                    totalProcessed += processed;
                }
                int punctuated = this.taskManager.punctuate();
                long punctuateLatency = this.advanceNowAndComputeLatency();
                totalPunctuateLatency += punctuateLatency;
                if (punctuated > 0) {
                    this.punctuateSensor.record((double)punctuateLatency / (double)punctuated, this.now);
                }
                long beforeCommitMs = this.now;
                int committed = this.maybeCommit();
                long commitLatency = Math.max(this.now - beforeCommitMs, 0L);
                totalCommitLatency += commitLatency;
                if (committed > 0) {
                    this.commitSensor.record((double)commitLatency / (double)committed, this.now);
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Committed all active tasks {} and standby tasks {} in {}ms", new Object[]{this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), commitLatency});
                    }
                }
                if (processed == 0) break;
                if (Math.max(this.now - this.lastPollMs, 0L) > (long)(this.maxPollTimeMs / 2)) {
                    this.numIterations = this.numIterations > 1 ? this.numIterations / 2 : this.numIterations;
                    break;
                }
                if (punctuated > 0 || committed > 0) {
                    this.numIterations = this.numIterations > 1 ? this.numIterations / 2 : this.numIterations;
                    continue;
                }
                ++this.numIterations;
            }
            this.taskManager.recordTaskProcessRatio(totalProcessLatency, this.now);
        }
        this.now = this.time.milliseconds();
        long runOnceLatency = this.now - startMs;
        this.processRecordsSensor.record((double)totalProcessed, this.now);
        this.processRatioSensor.record((double)totalProcessLatency / (double)runOnceLatency, this.now);
        this.punctuateRatioSensor.record((double)totalPunctuateLatency / (double)runOnceLatency, this.now);
        this.pollRatioSensor.record((double)pollLatency / (double)runOnceLatency, this.now);
        this.commitRatioSensor.record((double)totalCommitLatency / (double)runOnceLatency, this.now);
    }

    private ConsumerRecords<byte[], byte[]> pollRequests(Duration pollTime) {
        ConsumerRecords records = null;
        this.lastPollMs = this.now;
        try {
            records = this.mainConsumer.poll(pollTime);
        }
        catch (InvalidOffsetException e) {
            this.resetOffsets(e.partitions(), (Exception)((Object)e));
        }
        return records;
    }

    private void resetOffsets(Set<TopicPartition> partitions, Exception cause) {
        HashSet<String> loggedTopics = new HashSet<String>();
        HashSet<TopicPartition> seekToBeginning = new HashSet<TopicPartition>();
        HashSet<TopicPartition> seekToEnd = new HashSet<TopicPartition>();
        HashSet<TopicPartition> notReset = new HashSet<TopicPartition>();
        for (TopicPartition partition : partitions) {
            if (this.builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
                this.addToResetList(partition, seekToBeginning, "Setting topic '{}' to consume from {} offset", "earliest", loggedTopics);
                continue;
            }
            if (this.builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
                this.addToResetList(partition, seekToEnd, "Setting topic '{}' to consume from {} offset", "latest", loggedTopics);
                continue;
            }
            if ("earliest".equals(this.originalReset)) {
                this.addToResetList(partition, seekToBeginning, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics);
                continue;
            }
            if ("latest".equals(this.originalReset)) {
                this.addToResetList(partition, seekToEnd, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics);
                continue;
            }
            notReset.add(partition);
        }
        if (notReset.isEmpty()) {
            if (!seekToBeginning.isEmpty()) {
                this.mainConsumer.seekToBeginning(seekToBeginning);
            }
            if (!seekToEnd.isEmpty()) {
                this.mainConsumer.seekToEnd(seekToEnd);
            }
        } else {
            String notResetString = notReset.stream().map(TopicPartition::topic).distinct().collect(Collectors.joining(","));
            String format = String.format("No valid committed offset found for input [%s] and no valid reset policy configured. You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))", notResetString);
            if (cause == null) {
                throw new StreamsException(format);
            }
            throw new StreamsException(format, cause);
        }
    }

    private void addToResetList(TopicPartition partition, Set<TopicPartition> partitions, String logMessage, String resetPolicy, Set<String> loggedTopics) {
        String topic = partition.topic();
        if (loggedTopics.add(topic)) {
            this.log.info(logMessage, (Object)topic, (Object)resetPolicy);
        }
        partitions.add(partition);
    }

    int maybeCommit() {
        int committed;
        if (this.now - this.lastCommitMs > this.commitTimeMs) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)", new Object[]{this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.now - this.lastCommitMs, this.commitTimeMs});
            }
            if ((committed = this.taskManager.commit(this.taskManager.tasks().values().stream().filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING).collect(Collectors.toSet()))) > 0) {
                this.taskManager.maybePurgeCommittedRecords();
            }
            if (committed == -1) {
                this.log.trace("Unable to commit as we are in the middle of a rebalance, will try again when it completes.");
            } else {
                this.advanceNowAndComputeLatency();
                this.lastCommitMs = this.now;
            }
        } else {
            committed = this.taskManager.maybeCommitActiveTasksPerUserRequested();
        }
        return committed;
    }

    private long advanceNowAndComputeLatency() {
        long previous = this.now;
        this.now = this.time.milliseconds();
        return Math.max(this.now - previous, 0L);
    }

    public void shutdown() {
        this.log.info("Informed to shut down");
        State oldState = this.setState(State.PENDING_SHUTDOWN);
        if (oldState == State.CREATED) {
            this.completeShutdown(true);
        }
    }

    private void completeShutdown(boolean cleanRun) {
        this.setState(State.PENDING_SHUTDOWN);
        this.log.info("Shutting down");
        try {
            this.taskManager.shutdown(cleanRun);
        }
        catch (Throwable e) {
            this.log.error("Failed to close task manager due to the following error:", e);
        }
        try {
            this.changelogReader.clear();
        }
        catch (Throwable e) {
            this.log.error("Failed to close changelog reader due to the following error:", e);
        }
        try {
            this.mainConsumer.close();
        }
        catch (Throwable e) {
            this.log.error("Failed to close consumer due to the following error:", e);
        }
        try {
            this.restoreConsumer.close();
        }
        catch (Throwable e) {
            this.log.error("Failed to close restore consumer due to the following error:", e);
        }
        this.streamsMetrics.removeAllThreadLevelSensors(this.getName());
        this.setState(State.DEAD);
        this.log.info("Shutdown complete");
    }

    public final ThreadMetadata threadMetadata() {
        return this.threadMetadata;
    }

    StreamThread updateThreadMetadata(String adminClientId) {
        this.threadMetadata = new ThreadMetadata(this.getName(), this.state().name(), ClientUtils.getConsumerClientId(this.getName()), ClientUtils.getRestoreConsumerClientId(this.getName()), this.taskManager.producerClientIds(), adminClientId, Collections.emptySet(), Collections.emptySet());
        return this;
    }

    private void updateThreadMetadata(Map<TaskId, Task> activeTasks, Map<TaskId, Task> standbyTasks) {
        HashSet<TaskMetadata> activeTasksMetadata = new HashSet<TaskMetadata>();
        for (Map.Entry<TaskId, Task> entry : activeTasks.entrySet()) {
            activeTasksMetadata.add(new TaskMetadata(entry.getKey().toString(), entry.getValue().inputPartitions()));
        }
        HashSet<TaskMetadata> standbyTasksMetadata = new HashSet<TaskMetadata>();
        for (Map.Entry<TaskId, Task> entry : standbyTasks.entrySet()) {
            standbyTasksMetadata.add(new TaskMetadata(entry.getKey().toString(), entry.getValue().inputPartitions()));
        }
        String string = this.threadMetadata.adminClientId();
        this.threadMetadata = new ThreadMetadata(this.getName(), this.state().name(), ClientUtils.getConsumerClientId(this.getName()), ClientUtils.getRestoreConsumerClientId(this.getName()), this.taskManager.producerClientIds(), string, activeTasksMetadata, standbyTasksMetadata);
    }

    public Map<TaskId, Task> activeTaskMap() {
        return this.taskManager.activeTaskMap();
    }

    public List<Task> activeTasks() {
        return this.taskManager.activeTaskIterable();
    }

    public Map<TaskId, Task> allTasks() {
        return this.taskManager.tasks();
    }

    @Override
    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        return indent + "\tStreamsThread threadId: " + this.getName() + "\n" + this.taskManager.toString(indent);
    }

    public Map<MetricName, Metric> producerMetrics() {
        return this.taskManager.producerMetrics();
    }

    public Map<MetricName, Metric> consumerMetrics() {
        return ClientUtils.consumerMetrics(this.mainConsumer, this.restoreConsumer);
    }

    public Map<MetricName, Metric> adminClientMetrics() {
        return ClientUtils.adminClientMetrics(this.adminClient);
    }

    void setNow(long now) {
        this.now = now;
    }

    TaskManager taskManager() {
        return this.taskManager;
    }

    int currentNumIterations() {
        return this.numIterations;
    }

    ConsumerRebalanceListener rebalanceListener() {
        return this.rebalanceListener;
    }

    Consumer<byte[], byte[]> mainConsumer() {
        return this.mainConsumer;
    }

    Consumer<byte[], byte[]> restoreConsumer() {
        return this.restoreConsumer;
    }

    Admin adminClient() {
        return this.adminClient;
    }

    InternalTopologyBuilder internalTopologyBuilder() {
        return this.builder;
    }

    private static final class InternalConsumerConfig
    extends ConsumerConfig {
        private InternalConsumerConfig(Map<String, Object> props) {
            super(ConsumerConfig.addDeserializerToConfig(props, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()), false);
        }
    }

    public static enum ProcessingMode {
        AT_LEAST_ONCE("AT_LEAST_ONCE"),
        EXACTLY_ONCE_ALPHA("EXACTLY_ONCE_ALPHA"),
        EXACTLY_ONCE_BETA("EXACTLY_ONCE_BETA");

        public final String name;

        private ProcessingMode(String name) {
            this.name = name;
        }
    }

    public static interface StateListener {
        public void onChange(Thread var1, ThreadStateTransitionValidator var2, ThreadStateTransitionValidator var3);
    }

    public static enum State implements ThreadStateTransitionValidator
    {
        CREATED(1, 5),
        STARTING(2, 3, 5),
        PARTITIONS_REVOKED(2, 3, 5),
        PARTITIONS_ASSIGNED(2, 3, 4, 5),
        RUNNING(2, 3, 4, 5),
        PENDING_SHUTDOWN(6),
        DEAD(new Integer[0]);

        private final Set<Integer> validTransitions = new HashSet<Integer>();

        private State(Integer ... validTransitions) {
            this.validTransitions.addAll(Arrays.asList(validTransitions));
        }

        public boolean isAlive() {
            return this.equals(RUNNING) || this.equals(STARTING) || this.equals(PARTITIONS_REVOKED) || this.equals(PARTITIONS_ASSIGNED);
        }

        @Override
        public boolean isValidTransition(ThreadStateTransitionValidator newState) {
            State tmpState = (State)newState;
            return this.validTransitions.contains(tmpState.ordinal());
        }
    }
}

