/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.control;

import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
import org.apache.beam.sdk.metrics.BoundedTrie;
import org.apache.beam.sdk.metrics.BoundedTrieResult;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.DelegatingHistogram;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

@RunWith(value=JUnit4.class)
public class ExecutionStateSamplerTest {
    private static final Counter TEST_USER_COUNTER = Metrics.counter((String)"foo", (String)"counter");
    private static final Distribution TEST_USER_DISTRIBUTION = Metrics.distribution((String)"foo", (String)"distribution");
    private static final Gauge TEST_USER_GAUGE = Metrics.gauge((String)"foo", (String)"gauge");
    private static final StringSet TEST_USER_STRING_SET = Metrics.stringSet((String)"foo", (String)"stringset");
    private static final BoundedTrie TEST_USER_BOUNDED_TRIE = Metrics.boundedTrie((String)"foo", (String)"boundedtrie");
    private static final Histogram TEST_USER_HISTOGRAM = new DelegatingHistogram(MetricName.named((String)"foo", (String)"histogram"), (HistogramData.BucketType)HistogramData.LinearBuckets.of((double)0.0, (double)100.0, (int)1), false);
    @Rule
    public ExpectedLogs expectedLogs = ExpectedLogs.none(ExecutionStateSampler.class);

    @After
    public void tearDown() {
        MetricsEnvironment.setCurrentContainer(null);
    }

    @Test
    public void testSamplingProducesCorrectFinalResults() throws Exception {
        DateTimeUtils.MillisProvider clock = Mockito.mock(DateTimeUtils.MillisProvider.class);
        ExecutionStateSampler sampler = new ExecutionStateSampler(PipelineOptionsFactory.fromArgs((String[])new String[]{"--experiments=state_sampling_period_millis=10"}).create(), clock);
        ExecutionStateSampler.ExecutionStateTracker tracker1 = sampler.create();
        ExecutionStateSampler.ExecutionState state1 = tracker1.create("shortId1", "ptransformId1", "ptransformIdName1", "process");
        ExecutionStateSampler.ExecutionStateTracker tracker2 = sampler.create();
        ExecutionStateSampler.ExecutionState state2 = tracker2.create("shortId2", "ptransformId2", "ptransformIdName2", "process");
        final CountDownLatch waitTillActive = new CountDownLatch(1);
        final CountDownLatch waitTillIntermediateReport = new CountDownLatch(1);
        final CountDownLatch waitTillStatesDeactivated = new CountDownLatch(1);
        final CountDownLatch waitForSamples = new CountDownLatch(1);
        final CountDownLatch waitForMoreSamples = new CountDownLatch(1);
        final CountDownLatch waitForEvenMoreSamples = new CountDownLatch(1);
        final Thread testThread = Thread.currentThread();
        Mockito.when(clock.getMillis()).thenAnswer(new Answer<Long>(){
            private long currentTime;

            @Override
            public Long answer(InvocationOnMock invocation) throws Throwable {
                if (Thread.currentThread().equals(testThread)) {
                    return 1L;
                }
                if (this.currentTime < 1000L) {
                    waitTillActive.await();
                    this.currentTime += 100L;
                } else if (this.currentTime < 1500L) {
                    waitForSamples.countDown();
                    waitTillIntermediateReport.await();
                    this.currentTime += 100L;
                } else if (this.currentTime == 1500L) {
                    waitForMoreSamples.countDown();
                    waitTillStatesDeactivated.await();
                    this.currentTime = 1600L;
                } else if (this.currentTime == 1600L) {
                    waitForEvenMoreSamples.countDown();
                }
                return this.currentTime;
            }
        });
        Assert.assertNull(tracker1.getCurrentThreadsPTransformId());
        Assert.assertNull(tracker2.getCurrentThreadsPTransformId());
        Assert.assertNull(tracker1.getStatus());
        Assert.assertNull(tracker2.getStatus());
        tracker1.start("bundleId1");
        tracker2.start("bundleId2");
        state1.activate();
        state2.activate();
        Assert.assertEquals("ptransformId1", tracker1.getCurrentThreadsPTransformId());
        Assert.assertEquals("ptransformId2", tracker2.getCurrentThreadsPTransformId());
        ExecutionStateSampler.ExecutionStateTrackerStatus activeBundleStatus1 = tracker1.getStatus();
        ExecutionStateSampler.ExecutionStateTrackerStatus activeBundleStatus2 = tracker2.getStatus();
        Assert.assertEquals("ptransformId1", activeBundleStatus1.getPTransformId());
        Assert.assertEquals("ptransformId2", activeBundleStatus2.getPTransformId());
        Assert.assertEquals("ptransformIdName1", activeBundleStatus1.getPTransformUniqueName());
        Assert.assertEquals("ptransformIdName2", activeBundleStatus2.getPTransformUniqueName());
        Assert.assertEquals(Thread.currentThread(), activeBundleStatus1.getTrackedThread());
        Assert.assertEquals(Thread.currentThread(), activeBundleStatus2.getTrackedThread());
        MatcherAssert.assertThat(activeBundleStatus1.getLastTransitionTimeMillis(), Matchers.equalTo(1L));
        MatcherAssert.assertThat(activeBundleStatus2.getLastTransitionTimeMillis(), Matchers.equalTo(1L));
        waitTillActive.countDown();
        waitForSamples.await();
        Assert.assertEquals("ptransformId1", tracker1.getCurrentThreadsPTransformId());
        Assert.assertEquals("ptransformId2", tracker2.getCurrentThreadsPTransformId());
        ExecutionStateSampler.ExecutionStateTrackerStatus activeStateStatus1 = tracker1.getStatus();
        ExecutionStateSampler.ExecutionStateTrackerStatus activeStateStatus2 = tracker2.getStatus();
        Assert.assertEquals("ptransformId1", activeStateStatus1.getPTransformId());
        Assert.assertEquals("ptransformId2", activeStateStatus2.getPTransformId());
        Assert.assertEquals("ptransformIdName1", activeStateStatus1.getPTransformUniqueName());
        Assert.assertEquals("ptransformIdName2", activeStateStatus2.getPTransformUniqueName());
        Assert.assertEquals(Thread.currentThread(), activeStateStatus1.getTrackedThread());
        Assert.assertEquals(Thread.currentThread(), activeStateStatus2.getTrackedThread());
        MatcherAssert.assertThat(activeStateStatus1.getLastTransitionTimeMillis(), Matchers.greaterThan(activeBundleStatus1.getLastTransitionTimeMillis()));
        MatcherAssert.assertThat(activeStateStatus2.getLastTransitionTimeMillis(), Matchers.greaterThan(activeBundleStatus2.getLastTransitionTimeMillis()));
        HashMap intermediateResults1 = new HashMap();
        HashMap intermediateResults2 = new HashMap();
        tracker1.updateIntermediateMonitoringData(intermediateResults1);
        tracker2.updateIntermediateMonitoringData(intermediateResults2);
        MatcherAssert.assertThat(MonitoringInfoEncodings.decodeInt64Counter((ByteString)intermediateResults1.get("shortId1")), Matchers.anyOf(Matchers.equalTo(900L), Matchers.equalTo(1000L)));
        MatcherAssert.assertThat(MonitoringInfoEncodings.decodeInt64Counter((ByteString)intermediateResults2.get("shortId2")), Matchers.anyOf(Matchers.equalTo(900L), Matchers.equalTo(1000L)));
        waitTillIntermediateReport.countDown();
        waitForMoreSamples.await();
        state1.deactivate();
        state2.deactivate();
        waitTillStatesDeactivated.countDown();
        waitForEvenMoreSamples.await();
        Assert.assertNull(tracker1.getCurrentThreadsPTransformId());
        Assert.assertNull(tracker2.getCurrentThreadsPTransformId());
        ExecutionStateSampler.ExecutionStateTrackerStatus inactiveStateStatus1 = tracker1.getStatus();
        ExecutionStateSampler.ExecutionStateTrackerStatus inactiveStateStatus2 = tracker2.getStatus();
        Assert.assertNull(inactiveStateStatus1.getPTransformId());
        Assert.assertNull(inactiveStateStatus2.getPTransformId());
        Assert.assertNull(inactiveStateStatus1.getPTransformUniqueName());
        Assert.assertNull(inactiveStateStatus2.getPTransformUniqueName());
        Assert.assertEquals(Thread.currentThread(), inactiveStateStatus1.getTrackedThread());
        Assert.assertEquals(Thread.currentThread(), inactiveStateStatus2.getTrackedThread());
        MatcherAssert.assertThat(inactiveStateStatus1.getLastTransitionTimeMillis(), Matchers.greaterThan(activeStateStatus1.getLastTransitionTimeMillis()));
        MatcherAssert.assertThat(inactiveStateStatus2.getLastTransitionTimeMillis(), Matchers.greaterThan(activeStateStatus1.getLastTransitionTimeMillis()));
        HashMap finalResults1 = new HashMap();
        HashMap finalResults2 = new HashMap();
        tracker1.updateFinalMonitoringData(finalResults1);
        tracker2.updateFinalMonitoringData(finalResults2);
        MatcherAssert.assertThat(MonitoringInfoEncodings.decodeInt64Counter((ByteString)finalResults1.get("shortId1")), Matchers.anyOf(Matchers.equalTo(1400L), Matchers.equalTo(1500L)));
        MatcherAssert.assertThat(MonitoringInfoEncodings.decodeInt64Counter((ByteString)finalResults2.get("shortId2")), Matchers.anyOf(Matchers.equalTo(1400L), Matchers.equalTo(1500L)));
        tracker1.reset();
        tracker2.reset();
        Assert.assertNull(tracker1.getCurrentThreadsPTransformId());
        Assert.assertNull(tracker2.getCurrentThreadsPTransformId());
        Assert.assertNull(tracker1.getStatus());
        Assert.assertNull(tracker2.getStatus());
        sampler.stop();
        this.expectedLogs.verifyNotLogged("Operation ongoing");
    }

    @Test
    public void testSamplingDoesntReportDuplicateFinalResults() throws Exception {
        DateTimeUtils.MillisProvider clock = Mockito.mock(DateTimeUtils.MillisProvider.class);
        ExecutionStateSampler sampler = new ExecutionStateSampler(PipelineOptionsFactory.fromArgs((String[])new String[]{"--experiments=state_sampling_period_millis=10"}).create(), clock);
        ExecutionStateSampler.ExecutionStateTracker tracker1 = sampler.create();
        ExecutionStateSampler.ExecutionState state1 = tracker1.create("shortId1", "ptransformId1", "ptransformIdName1", "process");
        ExecutionStateSampler.ExecutionStateTracker tracker2 = sampler.create();
        ExecutionStateSampler.ExecutionState state2 = tracker2.create("shortId2", "ptransformId2", "ptransformIdName2", "process");
        final CountDownLatch waitTillActive = new CountDownLatch(1);
        final CountDownLatch waitForSamples = new CountDownLatch(1);
        final Thread testThread = Thread.currentThread();
        Mockito.when(clock.getMillis()).thenAnswer(new Answer<Long>(){
            private long currentTime;

            @Override
            public Long answer(InvocationOnMock invocation) throws Throwable {
                if (Thread.currentThread().equals(testThread)) {
                    return 0L;
                }
                waitTillActive.await();
                if (this.currentTime < 1000L) {
                    this.currentTime += 100L;
                } else {
                    waitForSamples.countDown();
                }
                return this.currentTime;
            }
        });
        tracker1.start("bundleId1");
        tracker2.start("bundleId2");
        state1.activate();
        state2.activate();
        waitTillActive.countDown();
        waitForSamples.await();
        state1.deactivate();
        state2.deactivate();
        HashMap intermediateResults1 = new HashMap();
        HashMap intermediateResults2 = new HashMap();
        tracker1.updateIntermediateMonitoringData(intermediateResults1);
        tracker2.updateIntermediateMonitoringData(intermediateResults2);
        MatcherAssert.assertThat(MonitoringInfoEncodings.decodeInt64Counter((ByteString)intermediateResults1.get("shortId1")), Matchers.anyOf(Matchers.equalTo(900L), Matchers.equalTo(1000L)));
        MatcherAssert.assertThat(MonitoringInfoEncodings.decodeInt64Counter((ByteString)intermediateResults2.get("shortId2")), Matchers.anyOf(Matchers.equalTo(900L), Matchers.equalTo(1000L)));
        state1.deactivate();
        state2.deactivate();
        HashMap finalResults1 = new HashMap();
        HashMap finalResults2 = new HashMap();
        tracker1.updateFinalMonitoringData(finalResults1);
        tracker2.updateFinalMonitoringData(finalResults2);
        Assert.assertTrue(finalResults1.isEmpty());
        Assert.assertTrue(finalResults2.isEmpty());
        tracker1.reset();
        tracker2.reset();
        sampler.stop();
        this.expectedLogs.verifyNotLogged("Operation ongoing");
    }

    @Test
    public void testCountersReturnedAreBasedUponCurrentExecutionState() throws Exception {
        DateTimeUtils.MillisProvider clock = Mockito.mock(DateTimeUtils.MillisProvider.class);
        ExecutionStateSampler sampler = new ExecutionStateSampler(PipelineOptionsFactory.fromArgs((String[])new String[]{"--experiments=state_sampling_period_millis=10"}).create(), clock);
        ExecutionStateSampler.ExecutionStateTracker tracker = sampler.create();
        MetricsEnvironment.setCurrentContainer((MetricsContainer)tracker.getMetricsContainer());
        ExecutionStateSampler.ExecutionState state = tracker.create("shortId", "ptransformId", "uniqueName", "state");
        state.activate();
        TEST_USER_COUNTER.inc();
        TEST_USER_DISTRIBUTION.update(2L);
        TEST_USER_GAUGE.set(3L);
        TEST_USER_STRING_SET.add("ab");
        TEST_USER_BOUNDED_TRIE.add(new String[]{"bt_ab"});
        TEST_USER_HISTOGRAM.update(4.0);
        state.deactivate();
        TEST_USER_COUNTER.inc(11L);
        TEST_USER_DISTRIBUTION.update(12L);
        TEST_USER_GAUGE.set(13L);
        TEST_USER_STRING_SET.add("cd");
        TEST_USER_BOUNDED_TRIE.add(new String[]{"bt_cd"});
        TEST_USER_HISTOGRAM.update(14.0);
        TEST_USER_HISTOGRAM.update(14.0);
        Assert.assertEquals(1L, tracker.getMetricsContainerRegistry().getContainer("ptransformId").getCounter(TEST_USER_COUNTER.getName()).getCumulative());
        Assert.assertEquals(2L, tracker.getMetricsContainerRegistry().getContainer("ptransformId").getDistribution(TEST_USER_DISTRIBUTION.getName()).getCumulative().sum());
        Assert.assertEquals(3L, tracker.getMetricsContainerRegistry().getContainer("ptransformId").getGauge(TEST_USER_GAUGE.getName()).getCumulative().value());
        Assert.assertEquals(ImmutableSet.of((Object)"ab"), tracker.getMetricsContainerRegistry().getContainer("ptransformId").getStringSet(TEST_USER_STRING_SET.getName()).getCumulative().stringSet());
        Assert.assertEquals(BoundedTrieResult.create((Set)ImmutableSet.of((Object)ImmutableList.of((Object)"bt_ab", (Object)String.valueOf(false)))), tracker.getMetricsContainerRegistry().getContainer("ptransformId").getBoundedTrie(TEST_USER_BOUNDED_TRIE.getName()).getCumulative().extractResult());
        Assert.assertEquals(1L, tracker.getMetricsContainerRegistry().getContainer("ptransformId").getHistogram(TEST_USER_HISTOGRAM.getName(), (HistogramData.BucketType)HistogramData.LinearBuckets.of((double)0.0, (double)100.0, (int)1)).getCumulative().getCount(0));
        Assert.assertEquals(11L, tracker.getMetricsContainerRegistry().getUnboundContainer().getCounter(TEST_USER_COUNTER.getName()).getCumulative());
        Assert.assertEquals(12L, tracker.getMetricsContainerRegistry().getUnboundContainer().getDistribution(TEST_USER_DISTRIBUTION.getName()).getCumulative().sum());
        Assert.assertEquals(13L, tracker.getMetricsContainerRegistry().getUnboundContainer().getGauge(TEST_USER_GAUGE.getName()).getCumulative().value());
        Assert.assertEquals(ImmutableSet.of((Object)"cd"), tracker.getMetricsContainerRegistry().getUnboundContainer().getStringSet(TEST_USER_STRING_SET.getName()).getCumulative().stringSet());
        Assert.assertEquals(BoundedTrieResult.create((Set)ImmutableSet.of((Object)ImmutableList.of((Object)"bt_cd", (Object)String.valueOf(false)))), tracker.getMetricsContainerRegistry().getUnboundContainer().getBoundedTrie(TEST_USER_BOUNDED_TRIE.getName()).getCumulative().extractResult());
        Assert.assertEquals(2L, tracker.getMetricsContainerRegistry().getUnboundContainer().getHistogram(TEST_USER_HISTOGRAM.getName(), (HistogramData.BucketType)HistogramData.LinearBuckets.of((double)0.0, (double)100.0, (int)1)).getCumulative().getCount(0));
    }

    @Test
    public void testTrackerReuse() throws Exception {
        DateTimeUtils.MillisProvider clock = Mockito.mock(DateTimeUtils.MillisProvider.class);
        ExecutionStateSampler sampler = new ExecutionStateSampler(PipelineOptionsFactory.fromArgs((String[])new String[]{"--experiments=state_sampling_period_millis=10"}).create(), clock);
        ExecutionStateSampler.ExecutionStateTracker tracker = sampler.create();
        MetricsEnvironment.setCurrentContainer((MetricsContainer)tracker.getMetricsContainer());
        ExecutionStateSampler.ExecutionState state = tracker.create("shortId", "ptransformId", "ptransformIdName", "process");
        final CountDownLatch waitTillActive = new CountDownLatch(1);
        final CountDownLatch waitTillSecondStateActive = new CountDownLatch(1);
        final CountDownLatch waitForSamples = new CountDownLatch(1);
        final CountDownLatch waitForMoreSamples = new CountDownLatch(1);
        final Thread testThread = Thread.currentThread();
        Mockito.when(clock.getMillis()).thenAnswer(new Answer<Long>(){
            private long currentTime;

            @Override
            public Long answer(InvocationOnMock invocation) throws Throwable {
                if (Thread.currentThread().equals(testThread)) {
                    return 0L;
                }
                if (this.currentTime < 1000L) {
                    waitTillActive.await();
                    this.currentTime += 100L;
                } else if (this.currentTime < 1500L) {
                    waitForSamples.countDown();
                    waitTillSecondStateActive.await();
                    this.currentTime += 100L;
                } else {
                    waitForMoreSamples.countDown();
                }
                return this.currentTime;
            }
        });
        tracker.start("bundleId1");
        state.activate();
        waitTillActive.countDown();
        waitForSamples.await();
        TEST_USER_COUNTER.inc();
        state.deactivate();
        HashMap finalResults = new HashMap();
        tracker.updateFinalMonitoringData(finalResults);
        MatcherAssert.assertThat(MonitoringInfoEncodings.decodeInt64Counter((ByteString)finalResults.get("shortId")), Matchers.anyOf(Matchers.equalTo(900L), Matchers.equalTo(1000L)));
        Assert.assertEquals(1L, tracker.getMetricsContainerRegistry().getContainer("ptransformId").getCounter(TEST_USER_COUNTER.getName()).getCumulative());
        tracker.reset();
        tracker.start("bundleId2");
        state.activate();
        waitTillSecondStateActive.countDown();
        waitForMoreSamples.await();
        TEST_USER_COUNTER.inc();
        state.deactivate();
        finalResults = new HashMap();
        tracker.updateFinalMonitoringData(finalResults);
        MatcherAssert.assertThat(MonitoringInfoEncodings.decodeInt64Counter((ByteString)finalResults.get("shortId")), Matchers.anyOf(Matchers.equalTo(400L), Matchers.equalTo(500L)));
        Assert.assertEquals(1L, tracker.getMetricsContainerRegistry().getContainer("ptransformId").getCounter(TEST_USER_COUNTER.getName()).getCumulative());
        tracker.reset();
        this.expectedLogs.verifyNotLogged("Operation ongoing");
    }

    @Test
    public void testLullDetectionOccursInActiveBundle() throws Exception {
        DateTimeUtils.MillisProvider clock = Mockito.mock(DateTimeUtils.MillisProvider.class);
        ExecutionStateSampler sampler = new ExecutionStateSampler(PipelineOptionsFactory.fromArgs((String[])new String[]{"--experiments=state_sampling_period_millis=10"}).create(), clock);
        ExecutionStateSampler.ExecutionStateTracker tracker = sampler.create();
        final CountDownLatch waitTillActive = new CountDownLatch(1);
        final CountDownLatch waitForSamples = new CountDownLatch(10);
        final Thread testThread = Thread.currentThread();
        Mockito.when(clock.getMillis()).thenAnswer(new Answer<Long>(){
            private long currentTime;

            @Override
            public Long answer(InvocationOnMock invocation) throws Throwable {
                if (Thread.currentThread().equals(testThread)) {
                    return 0L;
                }
                waitTillActive.await();
                waitForSamples.countDown();
                this.currentTime += Duration.standardMinutes((long)1L).getMillis();
                return this.currentTime;
            }
        });
        tracker.start("bundleId");
        waitTillActive.countDown();
        waitForSamples.await();
        tracker.reset();
        sampler.stop();
        this.expectedLogs.verifyWarn("Operation ongoing in bundle bundleId for at least");
    }

    @Test
    public void testLullDetectionOccursInActiveState() throws Exception {
        DateTimeUtils.MillisProvider clock = Mockito.mock(DateTimeUtils.MillisProvider.class);
        ExecutionStateSampler sampler = new ExecutionStateSampler(PipelineOptionsFactory.fromArgs((String[])new String[]{"--experiments=state_sampling_period_millis=10"}).create(), clock);
        ExecutionStateSampler.ExecutionStateTracker tracker = sampler.create();
        ExecutionStateSampler.ExecutionState state = tracker.create("shortId", "ptransformId", "ptransformIdName", "process");
        final CountDownLatch waitTillActive = new CountDownLatch(1);
        final CountDownLatch waitForSamples = new CountDownLatch(10);
        final Thread testThread = Thread.currentThread();
        Mockito.when(clock.getMillis()).thenAnswer(new Answer<Long>(){
            private long currentTime;

            @Override
            public Long answer(InvocationOnMock invocation) throws Throwable {
                if (Thread.currentThread().equals(testThread)) {
                    return 0L;
                }
                waitTillActive.await();
                waitForSamples.countDown();
                this.currentTime += Duration.standardMinutes((long)1L).getMillis();
                return this.currentTime;
            }
        });
        tracker.start("bundleId");
        state.activate();
        waitTillActive.countDown();
        waitForSamples.await();
        state.deactivate();
        tracker.reset();
        sampler.stop();
        this.expectedLogs.verifyWarn("Operation ongoing in bundle bundleId for PTransform");
    }

    @Test
    public void testErrorState() throws Exception {
        DateTimeUtils.MillisProvider clock = Mockito.mock(DateTimeUtils.MillisProvider.class);
        ExecutionStateSampler sampler = new ExecutionStateSampler(PipelineOptionsFactory.fromArgs((String[])new String[]{"--experiments=state_sampling_period_millis=10"}).create(), clock);
        ExecutionStateSampler.ExecutionStateTracker tracker = sampler.create();
        ExecutionStateSampler.ExecutionState state1 = tracker.create("shortId1", "ptransformId1", "ptransformIdName1", "process");
        ExecutionStateSampler.ExecutionState state2 = tracker.create("shortId2", "ptransformId2", "ptransformIdName2", "process");
        state1.activate();
        state2.activate();
        Assert.assertTrue(state2.error());
        Assert.assertFalse(state2.error());
        state2.deactivate();
        Assert.assertFalse(state2.error());
        tracker.reset();
        Assert.assertTrue(state1.error());
    }
}

