/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.data;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class PTransformFunctionRegistryTest {
    private static final Counter TEST_USER_COUNTER = Metrics.counter((String)"foo", (String)"bar");
    private ExecutionStateSampler sampler;

    @Before
    public void setUp() {
        this.sampler = new ExecutionStateSampler(PipelineOptionsFactory.create(), System::currentTimeMillis);
    }

    @After
    public void tearDown() {
        MetricsEnvironment.setCurrentContainer(null);
        this.sampler.stop();
    }

    @Test
    public void testStateTrackerRecordsStateTransitions() throws Exception {
        final ExecutionStateSampler.ExecutionStateTracker executionStateTracker = this.sampler.create();
        MetricsEnvironment.setCurrentContainer((MetricsContainer)executionStateTracker.getMetricsContainer());
        PTransformFunctionRegistry testObject = new PTransformFunctionRegistry(new ShortIdMap(), executionStateTracker, "beam:metric:pardo_execution_time:start_bundle_msecs:v1");
        final AtomicBoolean runnableAWasCalled = new AtomicBoolean();
        final AtomicBoolean runnableBWasCalled = new AtomicBoolean();
        ThrowingRunnable runnableA = new ThrowingRunnable(){

            public void run() throws Exception {
                runnableAWasCalled.set(true);
                ExecutionStateSampler.ExecutionStateTrackerStatus executionStateTrackerStatus = executionStateTracker.getStatus();
                Assert.assertNotNull(executionStateTrackerStatus);
                Assert.assertEquals(Thread.currentThread(), executionStateTrackerStatus.getTrackedThread());
                Assert.assertEquals("pTransformA", executionStateTrackerStatus.getPTransformId());
            }
        };
        ThrowingRunnable runnableB = new ThrowingRunnable(){

            public void run() throws Exception {
                runnableBWasCalled.set(true);
                ExecutionStateSampler.ExecutionStateTrackerStatus executionStateTrackerStatus = executionStateTracker.getStatus();
                Assert.assertNotNull(executionStateTrackerStatus);
                Assert.assertEquals(Thread.currentThread(), executionStateTrackerStatus.getTrackedThread());
                Assert.assertEquals("pTransformB", executionStateTrackerStatus.getPTransformId());
            }
        };
        testObject.register("pTransformA", "pTranformAName", runnableA);
        testObject.register("pTransformB", "pTranformBName", runnableB);
        executionStateTracker.start("testBundleId");
        for (ThrowingRunnable func : testObject.getFunctions()) {
            func.run();
        }
        executionStateTracker.reset();
        Assert.assertTrue(runnableAWasCalled.get());
        Assert.assertTrue(runnableBWasCalled.get());
    }

    @Test
    public void testMetricsUponRunningFunctions() throws Exception {
        ExecutionStateSampler.ExecutionStateTracker executionStateTracker = this.sampler.create();
        MetricsEnvironment.setCurrentContainer((MetricsContainer)executionStateTracker.getMetricsContainer());
        PTransformFunctionRegistry testObject = new PTransformFunctionRegistry(new ShortIdMap(), executionStateTracker, "beam:metric:pardo_execution_time:start_bundle_msecs:v1");
        testObject.register("pTransformA", "pTranformAName", () -> TEST_USER_COUNTER.inc());
        testObject.register("pTransformB", "pTranformBName", () -> TEST_USER_COUNTER.inc(2L));
        executionStateTracker.start("testBundleId");
        for (ThrowingRunnable func : testObject.getFunctions()) {
            func.run();
        }
        TEST_USER_COUNTER.inc(3L);
        Assert.assertEquals(1L, executionStateTracker.getMetricsContainerRegistry().getContainer("pTransformA").getCounter(TEST_USER_COUNTER.getName()).getCumulative());
        Assert.assertEquals(2L, executionStateTracker.getMetricsContainerRegistry().getContainer("pTransformB").getCounter(TEST_USER_COUNTER.getName()).getCumulative());
        Assert.assertEquals(3L, executionStateTracker.getMetricsContainerRegistry().getUnboundContainer().getCounter(TEST_USER_COUNTER.getName()).getCumulative());
        executionStateTracker.reset();
    }
}

