/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.ProcessorTopologyFactories;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockRestoreConsumer;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.mockito.verification.VerificationMode;

@ExtendWith(value={MockitoExtension.class})
@MockitoSettings(strictness=Strictness.STRICT_STUBS)
public class StandbyTaskTest {
    private final String threadName = "threadName";
    private final String threadId = Thread.currentThread().getName();
    private final TaskId taskId = new TaskId(0, 0, "My-Topology");
    private final String storeName1 = "store1";
    private final String storeName2 = "store2";
    private final String applicationId = "test-application";
    private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"store1", (String)this.taskId.topologyName());
    private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"store2", (String)this.taskId.topologyName());
    private final TopicPartition partition = new TopicPartition(this.storeChangelogTopicName1, 0);
    private final MockKeyValueStore store1 = new MockKeyValueStoreBuilder("store1", false).build();
    private final MockKeyValueStore store2 = new MockKeyValueStoreBuilder("store2", true).build();
    private final ProcessorTopology topology = ProcessorTopologyFactories.withLocalStores(Arrays.asList(this.store1, this.store2), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"store1", (Object)this.storeChangelogTopicName1), Utils.mkEntry((Object)"store2", (Object)this.storeChangelogTopicName2)}));
    private final MockTime time = new MockTime();
    private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), (Time)this.time);
    private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, "threadName", "latest", (Time)this.time);
    private File baseDir;
    private StreamsConfig config;
    private StateDirectory stateDirectory;
    private StandbyTask task;
    private final MockRestoreConsumer<Integer, Integer> restoreStateConsumer = new MockRestoreConsumer(new IntegerSerializer(), new IntegerSerializer());
    @Mock
    private ProcessorStateManager stateManager;

    private StreamsConfig createConfig(File baseDir) throws IOException {
        return new StreamsConfig((Map)Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"test-application"), Utils.mkEntry((Object)"metrics.recording.level", (Object)Sensor.RecordingLevel.DEBUG.name), Utils.mkEntry((Object)"bootstrap.servers", (Object)"localhost:2171"), Utils.mkEntry((Object)"buffered.records.per.partition", (Object)"3"), Utils.mkEntry((Object)"state.dir", (Object)baseDir.getCanonicalPath()), Utils.mkEntry((Object)"default.timestamp.extractor", (Object)MockTimestampExtractor.class.getName())})));
    }

    @BeforeEach
    public void setup() throws Exception {
        Mockito.when((Object)this.stateManager.taskId()).thenReturn((Object)this.taskId);
        Mockito.when((Object)this.stateManager.taskType()).thenReturn((Object)Task.TaskType.STANDBY);
        this.restoreStateConsumer.reset();
        this.restoreStateConsumer.updatePartitions(this.storeChangelogTopicName1, Arrays.asList(new PartitionInfo(this.storeChangelogTopicName1, 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName1, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName1, 2, Node.noNode(), new Node[0], new Node[0])));
        this.restoreStateConsumer.updatePartitions(this.storeChangelogTopicName2, Arrays.asList(new PartitionInfo(this.storeChangelogTopicName2, 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName2, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0])));
        this.baseDir = TestUtils.tempDirectory();
        this.config = this.createConfig(this.baseDir);
        this.stateDirectory = new StateDirectory(this.config, (Time)new MockTime(), true, true);
    }

    @AfterEach
    public void cleanup() throws IOException {
        if (this.task != null) {
            block3: {
                try {
                    this.task.suspend();
                }
                catch (IllegalStateException maybeSwallow) {
                    if (maybeSwallow.getMessage().startsWith("Illegal state CLOSED while suspending standby task")) break block3;
                    throw maybeSwallow;
                }
            }
            this.task.closeDirty();
            this.task = null;
        }
        Utils.delete((File)this.baseDir);
    }

    @Test
    public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException {
        this.stateDirectory = (StateDirectory)Mockito.mock(StateDirectory.class);
        Mockito.when((Object)this.stateDirectory.lock(this.taskId)).thenReturn((Object)false);
        Mockito.when((Object)this.stateManager.taskType()).thenReturn((Object)Task.TaskType.STANDBY);
        this.task = this.createStandbyTask();
        Assertions.assertThrows(LockException.class, () -> this.task.initializeIfNeeded());
        this.task = null;
    }

    @Test
    public void shouldTransitToRunningAfterInitialization() {
        ((ProcessorStateManager)Mockito.doNothing().when((Object)this.stateManager)).registerStateStores((List)ArgumentMatchers.any(), (InternalProcessorContext)ArgumentMatchers.any());
        this.task = this.createStandbyTask();
        Assertions.assertEquals((Object)Task.State.CREATED, (Object)this.task.state());
        this.task.initializeIfNeeded();
        Assertions.assertEquals((Object)Task.State.RUNNING, (Object)this.task.state());
        this.task.initializeIfNeeded();
        Assertions.assertEquals((Object)Task.State.RUNNING, (Object)this.task.state());
    }

    @Test
    public void shouldThrowIfCommittingOnIllegalState() {
        this.task = this.createStandbyTask();
        this.task.suspend();
        this.task.closeClean();
        Assertions.assertThrows(IllegalStateException.class, () -> ((StandbyTask)this.task).prepareCommit());
    }

    @Test
    public void shouldAlwaysCheckpointStateIfEnforced() {
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.emptyMap());
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.maybeCheckpoint(true);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).flush();
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).checkpoint();
    }

    @Test
    public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.partition, 50L)).thenReturn(Collections.singletonMap(this.partition, 11000L)).thenReturn(Collections.singletonMap(this.partition, 12000L));
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.maybeCheckpoint(false);
        Assertions.assertTrue((boolean)this.task.offsetSnapshotSinceLastFlush.isEmpty());
        this.task.maybeCheckpoint(false);
        Assertions.assertEquals(Collections.singletonMap(this.partition, 11000L), (Object)this.task.offsetSnapshotSinceLastFlush);
        this.task.maybeCheckpoint(false);
        Assertions.assertEquals(Collections.singletonMap(this.partition, 11000L), (Object)this.task.offsetSnapshotSinceLastFlush);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).flush();
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).checkpoint();
    }

    @Test
    public void shouldFlushAndCheckpointStateManagerOnCommit() {
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.emptyMap());
        ((ProcessorStateManager)Mockito.doNothing().when((Object)this.stateManager)).flush();
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.partition, 50L)).thenReturn(Collections.singletonMap(this.partition, 11000L)).thenReturn(Collections.singletonMap(this.partition, 11000L));
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.prepareCommit();
        this.task.postCommit(false);
        this.task.prepareCommit();
        this.task.postCommit(false);
        this.task.prepareCommit();
        this.task.postCommit(false);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).checkpoint();
    }

    @Test
    public void shouldReturnStateManagerChangelogOffsets() {
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.partition, 50L));
        this.task = this.createStandbyTask();
        Assertions.assertEquals(Collections.singletonMap(this.partition, 50L), (Object)this.task.changelogOffsets());
    }

    @Test
    public void shouldNotFlushAndThrowOnCloseDirty() {
        ((ProcessorStateManager)Mockito.doThrow((Throwable[])new Throwable[]{new ProcessorStateException("KABOOM!")}).when((Object)this.stateManager)).close();
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.closeDirty();
        Assertions.assertEquals((Object)Task.State.CLOSED, (Object)this.task.state());
        double expectedCloseTaskMetric = 1.0;
        this.verifyCloseTaskMetric(1.0, this.streamsMetrics, metricName);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager, (VerificationMode)Mockito.never())).flush();
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager, (VerificationMode)Mockito.never())).checkpoint();
    }

    @Test
    public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
        ((ProcessorStateManager)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("KABOOM!")}).when((Object)this.stateManager)).close();
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.closeDirty();
    }

    @Test
    public void shouldSuspendAndCommitBeforeCloseClean() {
        ((ProcessorStateManager)Mockito.doNothing().when((Object)this.stateManager)).close();
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.partition, 60L));
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.prepareCommit();
        this.task.postCommit(true);
        this.task.closeClean();
        Assertions.assertEquals((Object)Task.State.CLOSED, (Object)this.task.state());
        double expectedCloseTaskMetric = 1.0;
        this.verifyCloseTaskMetric(1.0, this.streamsMetrics, metricName);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).checkpoint();
    }

    @Test
    public void shouldRequireSuspendingCreatedTasksBeforeClose() {
        this.task = this.createStandbyTask();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.CREATED));
        Assertions.assertThrows(IllegalStateException.class, () -> this.task.closeClean());
        this.task.suspend();
        this.task.closeClean();
    }

    @Test
    public void shouldOnlyNeedCommitWhenChangelogOffsetChanged() {
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.partition, 50L)).thenReturn(Collections.singletonMap(this.partition, 10100L));
        ((ProcessorStateManager)Mockito.doNothing().when((Object)this.stateManager)).flush();
        ((ProcessorStateManager)Mockito.doNothing().when((Object)this.stateManager)).checkpoint();
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        Assertions.assertFalse((boolean)this.task.commitNeeded());
        Assertions.assertTrue((boolean)this.task.commitNeeded());
        this.task.prepareCommit();
        this.task.postCommit(true);
    }

    @Test
    public void shouldThrowOnCloseCleanError() {
        ((ProcessorStateManager)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("KABOOM!")}).when((Object)this.stateManager)).close();
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        Assertions.assertThrows(RuntimeException.class, () -> this.task.closeClean());
        double expectedCloseTaskMetric = 0.0;
        this.verifyCloseTaskMetric(0.0, this.streamsMetrics, metricName);
    }

    @Test
    public void shouldThrowOnCloseCleanCheckpointError() {
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.partition, 50L));
        ((ProcessorStateManager)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("KABOOM!")}).when((Object)this.stateManager)).checkpoint();
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.prepareCommit();
        Assertions.assertThrows(RuntimeException.class, () -> this.task.postCommit(true));
        Assertions.assertEquals((Object)Task.State.RUNNING, (Object)this.task.state());
        double expectedCloseTaskMetric = 0.0;
        this.verifyCloseTaskMetric(0.0, this.streamsMetrics, metricName);
    }

    @Test
    public void shouldUnregisterMetricsInCloseClean() {
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.closeClean();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.empty());
    }

    @Test
    public void shouldUnregisterMetricsInCloseDirty() {
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.closeDirty();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.empty());
    }

    @Test
    public void shouldCloseStateManagerOnTaskCreated() {
        ((ProcessorStateManager)Mockito.doNothing().when((Object)this.stateManager)).close();
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createStandbyTask();
        this.task.suspend();
        this.task.closeDirty();
        double expectedCloseTaskMetric = 1.0;
        this.verifyCloseTaskMetric(1.0, this.streamsMetrics, metricName);
        Assertions.assertEquals((Object)Task.State.CLOSED, (Object)this.task.state());
    }

    @Test
    public void shouldDeleteStateDirOnTaskCreatedAndEosAlphaUncleanClose() {
        ((ProcessorStateManager)Mockito.doNothing().when((Object)this.stateManager)).close();
        Mockito.when((Object)this.stateManager.baseDir()).thenReturn((Object)this.baseDir);
        MetricName metricName = this.setupCloseTaskMetric();
        this.config = new StreamsConfig((Map)Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"test-application"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"localhost:2171"), Utils.mkEntry((Object)"processing.guarantee", (Object)"exactly_once")})));
        this.task = this.createStandbyTask();
        this.task.suspend();
        this.task.closeDirty();
        double expectedCloseTaskMetric = 1.0;
        this.verifyCloseTaskMetric(1.0, this.streamsMetrics, metricName);
        Assertions.assertEquals((Object)Task.State.CLOSED, (Object)this.task.state());
    }

    @Test
    public void shouldDeleteStateDirOnTaskCreatedAndEosV2UncleanClose() {
        ((ProcessorStateManager)Mockito.doNothing().when((Object)this.stateManager)).close();
        Mockito.when((Object)this.stateManager.baseDir()).thenReturn((Object)this.baseDir);
        MetricName metricName = this.setupCloseTaskMetric();
        this.config = new StreamsConfig((Map)Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"test-application"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"localhost:2171"), Utils.mkEntry((Object)"processing.guarantee", (Object)"exactly_once_v2")})));
        this.task = this.createStandbyTask();
        this.task.suspend();
        this.task.closeDirty();
        double expectedCloseTaskMetric = 1.0;
        this.verifyCloseTaskMetric(1.0, this.streamsMetrics, metricName);
        Assertions.assertEquals((Object)Task.State.CLOSED, (Object)this.task.state());
    }

    @Test
    public void shouldPrepareRecycleSuspendedTask() {
        this.task = this.createStandbyTask();
        Assertions.assertThrows(IllegalStateException.class, () -> this.task.prepareRecycle());
        this.task.initializeIfNeeded();
        Assertions.assertThrows(IllegalStateException.class, () -> this.task.prepareRecycle());
        this.task.suspend();
        this.task.prepareRecycle();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.empty());
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).recycle();
    }

    @Test
    public void shouldAlwaysSuspendCreatedTasks() {
        this.task = this.createStandbyTask();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.CREATED));
        this.task.suspend();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.SUSPENDED));
    }

    @Test
    public void shouldAlwaysSuspendRunningTasks() {
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.RUNNING));
        this.task.suspend();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.SUSPENDED));
    }

    @Test
    public void shouldInitTaskTimeoutAndEventuallyThrow() {
        this.task = this.createStandbyTask();
        this.task.maybeInitTaskTimeoutOrThrow(0L, null);
        this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).toMillis(), null);
        StreamsException thrown = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).plus(Duration.ofMillis(1L)).toMillis(), null));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.isA(TimeoutException.class));
    }

    @Test
    public void shouldClearTaskTimeout() {
        this.task = this.createStandbyTask();
        this.task.maybeInitTaskTimeoutOrThrow(0L, null);
        this.task.clearTaskTimeout();
        this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).plus(Duration.ofMillis(1L)).toMillis(), null);
    }

    @Test
    public void shouldRecordRestoredRecords() {
        this.task = this.createStandbyTask();
        KafkaMetric totalMetric = this.getMetric("update", "%s-total", this.task.id().toString());
        KafkaMetric rateMetric = this.getMetric("update", "%s-rate", this.task.id().toString());
        MatcherAssert.assertThat((Object)totalMetric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)0.0));
        MatcherAssert.assertThat((Object)rateMetric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)0.0));
        this.task.recordRestoration((Time)this.time, 25L, false);
        MatcherAssert.assertThat((Object)totalMetric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)25.0));
        MatcherAssert.assertThat((Object)rateMetric.metricValue(), (Matcher)Matchers.not((Object)0.0));
        this.task.recordRestoration((Time)this.time, 50L, false);
        MatcherAssert.assertThat((Object)totalMetric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)75.0));
        MatcherAssert.assertThat((Object)rateMetric.metricValue(), (Matcher)Matchers.not((Object)0.0));
    }

    private KafkaMetric getMetric(String operation, String nameFormat, String taskId) {
        String descriptionIsNotVerified = "";
        return (KafkaMetric)this.metrics.metrics().get(this.metrics.metricName(String.format(nameFormat, operation), "stream-task-metrics", "", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"task-id", (Object)taskId), Utils.mkEntry((Object)"thread-id", (Object)Thread.currentThread().getName())})));
    }

    private StandbyTask createStandbyTask() {
        ThreadCache cache = new ThreadCache(new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())), 0L, this.streamsMetrics);
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, this.config, this.stateManager, this.streamsMetrics, cache);
        return new StandbyTask(this.taskId, Collections.singleton(this.partition), this.topology, new TopologyConfig(this.config).getTaskConfig(), this.streamsMetrics, this.stateManager, this.stateDirectory, cache, (InternalProcessorContext)context);
    }

    private MetricName setupCloseTaskMetric() {
        MetricName metricName = new MetricName("name", "group", "description", Collections.emptyMap());
        Sensor sensor = this.streamsMetrics.threadLevelSensor(this.threadId, "task-closed", Sensor.RecordingLevel.INFO, new Sensor[0]);
        sensor.add(metricName, (MeasurableStat)new CumulativeSum());
        return metricName;
    }

    private void verifyCloseTaskMetric(double expected, StreamsMetricsImpl streamsMetrics, MetricName metricName) {
        KafkaMetric metric = (KafkaMetric)streamsMetrics.metrics().get(metricName);
        double totalCloses = metric.measurable().measure(metric.config(), System.currentTimeMillis());
        MatcherAssert.assertThat((Object)totalCloses, (Matcher)CoreMatchers.equalTo((Object)expected));
    }

    private List<MetricName> getTaskMetrics() {
        return this.streamsMetrics.metrics().keySet().stream().filter(m -> m.tags().containsKey("task-id")).collect(Collectors.toList());
    }
}

