/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.control;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
import org.apache.beam.fn.harness.control.Metrics;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class MetricsTest {
    private static final MetricName TEST_NAME = MetricName.named((String)"testNamespace", (String)"testName");
    private static final String TEST_ID = "testId";
    @Rule
    public TestExecutors.TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);

    @Test
    public void testAccurateBundleCounterReportsValueFirstTimeWithoutMutations() throws Exception {
        HashMap report = new HashMap();
        Metrics.BundleCounter bundleCounter = Metrics.bundleProcessingThreadCounter((String)TEST_ID, (MetricName)TEST_NAME);
        bundleCounter.updateIntermediateMonitoringData(report);
        Assert.assertEquals(report, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(0L)));
        report.clear();
        bundleCounter.updateFinalMonitoringData(report);
        Assert.assertEquals(report, Collections.emptyMap());
        bundleCounter.reset();
        bundleCounter.updateFinalMonitoringData(report);
        Assert.assertEquals(report, Collections.emptyMap());
    }

    @Test
    public void testAccurateBundleDistributionReportsValueFirstTimeWithoutMutations() throws Exception {
        HashMap report = new HashMap();
        Metrics.BundleDistribution bundleDistribution = Metrics.bundleProcessingThreadDistribution((String)TEST_ID, (MetricName)TEST_NAME);
        bundleDistribution.updateIntermediateMonitoringData(report);
        Assert.assertEquals(report, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Distribution(DistributionData.EMPTY)));
        report.clear();
        bundleDistribution.updateFinalMonitoringData(report);
        Assert.assertEquals(report, Collections.emptyMap());
        bundleDistribution.reset();
        bundleDistribution.updateFinalMonitoringData(report);
        Assert.assertEquals(report, Collections.emptyMap());
    }

    @Test
    public void testAccurateBundleCounterWithMutations() throws Exception {
        HashMap report = new HashMap();
        Metrics.BundleCounter bundleCounter = Metrics.bundleProcessingThreadCounter((String)TEST_ID, (MetricName)TEST_NAME);
        bundleCounter.inc(7L);
        bundleCounter.updateIntermediateMonitoringData(report);
        Assert.assertEquals(report, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(7L)));
        report.clear();
        bundleCounter.updateIntermediateMonitoringData(report);
        Assert.assertEquals(report, Collections.emptyMap());
        bundleCounter.inc();
        bundleCounter.updateIntermediateMonitoringData(report);
        Assert.assertEquals(report, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(8L)));
        report.clear();
        bundleCounter.dec(4L);
        bundleCounter.updateIntermediateMonitoringData(report);
        Assert.assertEquals(report, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(4L)));
        bundleCounter.dec();
        bundleCounter.updateFinalMonitoringData(report);
        Assert.assertEquals(report, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(3L)));
        bundleCounter.reset();
        bundleCounter.inc(7L);
        bundleCounter.updateIntermediateMonitoringData(report);
        Assert.assertEquals(report, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(7L)));
        report.clear();
        bundleCounter.updateIntermediateMonitoringData(report);
        Assert.assertEquals(report, Collections.emptyMap());
        bundleCounter.inc();
        bundleCounter.updateIntermediateMonitoringData(report);
        Assert.assertEquals(report, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(8L)));
        report.clear();
        bundleCounter.dec(4L);
        bundleCounter.updateIntermediateMonitoringData(report);
        Assert.assertEquals(report, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(4L)));
        bundleCounter.dec();
        bundleCounter.updateFinalMonitoringData(report);
        Assert.assertEquals(report, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(3L)));
    }

    @Test
    public void testAccurateBundleDistributionWithMutations() throws Exception {
        HashMap report = new HashMap();
        Metrics.BundleDistribution bundleDistribution = Metrics.bundleProcessingThreadDistribution((String)TEST_ID, (MetricName)TEST_NAME);
        bundleDistribution.update(7L);
        bundleDistribution.updateIntermediateMonitoringData(report);
        Assert.assertEquals(report, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Distribution(DistributionData.singleton(7L))));
        report.clear();
        bundleDistribution.updateIntermediateMonitoringData(report);
        Assert.assertEquals(report, Collections.emptyMap());
        bundleDistribution.update(5L, 2L, 2L, 3L);
        bundleDistribution.updateIntermediateMonitoringData(report);
        Assert.assertEquals(report, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Distribution(DistributionData.create(12L, 3L, 2L, 7L))));
        report.clear();
        bundleDistribution.reset();
        bundleDistribution.update(7L);
        bundleDistribution.updateIntermediateMonitoringData(report);
        Assert.assertEquals(report, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Distribution(DistributionData.singleton(7L))));
        report.clear();
        bundleDistribution.updateIntermediateMonitoringData(report);
        Assert.assertEquals(report, Collections.emptyMap());
        bundleDistribution.update(5L, 2L, 2L, 3L);
        bundleDistribution.updateIntermediateMonitoringData(report);
        Assert.assertEquals(report, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Distribution(DistributionData.create(12L, 3L, 2L, 7L))));
    }

    @Test
    public void testAccurateBundleCounterUsingMultipleThreads() throws Exception {
        Metrics.BundleCounter bundleCounter = Metrics.bundleProcessingThreadCounter((String)TEST_ID, (MetricName)TEST_NAME);
        List<ByteString> values = this.testAccurateBundleMetricUsingMultipleThreads((BundleProgressReporter)bundleCounter, () -> bundleCounter.inc());
        Assert.assertTrue(values.size() >= 10);
        ArrayList<Long> sortedValues = new ArrayList<Long>();
        for (ByteString value : values) {
            sortedValues.add(MonitoringInfoEncodings.decodeInt64Counter(value));
        }
        Collections.sort(sortedValues);
        ArrayList<ByteString> sortedEncodedValues = new ArrayList<ByteString>();
        for (Long value : sortedValues) {
            sortedEncodedValues.add(MonitoringInfoEncodings.encodeInt64Counter(value));
        }
        MatcherAssert.assertThat(values, Matchers.contains(sortedEncodedValues.toArray()));
    }

    @Test
    public void testAccurateBundleDistributionUsingMultipleThreads() throws Exception {
        Metrics.BundleDistribution bundleDistribution = Metrics.bundleProcessingThreadDistribution((String)TEST_ID, (MetricName)TEST_NAME);
        List<ByteString> values = this.testAccurateBundleMetricUsingMultipleThreads((BundleProgressReporter)bundleDistribution, () -> bundleDistribution.update(1L));
        Assert.assertTrue(values.size() >= 10);
        ArrayList<DistributionData> sortedValues = new ArrayList<DistributionData>();
        for (ByteString value : values) {
            sortedValues.add(MonitoringInfoEncodings.decodeInt64Distribution(value));
        }
        Collections.sort(sortedValues, Comparator.comparingLong(DistributionData::count));
        ArrayList<ByteString> sortedEncodedValues = new ArrayList<ByteString>();
        for (DistributionData value : sortedValues) {
            sortedEncodedValues.add(MonitoringInfoEncodings.encodeInt64Distribution(value));
        }
        MatcherAssert.assertThat(values, Matchers.contains(sortedEncodedValues.toArray()));
    }

    private List<ByteString> testAccurateBundleMetricUsingMultipleThreads(BundleProgressReporter metric, MetricMutator metricMutator) throws Exception {
        ReentrantLock progressLock = new ReentrantLock();
        ArrayList<ByteString> reportedValues = new ArrayList<ByteString>();
        AtomicBoolean shouldMainExit = new AtomicBoolean();
        AtomicBoolean shouldProgressExit = new AtomicBoolean();
        Future intermediateProgressTask = this.executor.submit(() -> {
            while (!shouldProgressExit.get()) {
                HashMap data = new HashMap();
                progressLock.lock();
                try {
                    metric.updateIntermediateMonitoringData(data);
                    if (data.isEmpty()) continue;
                    reportedValues.add((ByteString)data.get(TEST_ID));
                    if (reportedValues.size() < 10) continue;
                    shouldMainExit.set(true);
                }
                finally {
                    progressLock.unlock();
                }
            }
            return null;
        });
        Future mainTask = this.executor.submit(() -> {
            HashMap data = new HashMap();
            while (!shouldMainExit.get()) {
                metricMutator.mutate();
            }
            progressLock.lock();
            try {
                metric.updateFinalMonitoringData(data);
                if (!data.isEmpty()) {
                    reportedValues.add((ByteString)data.get(TEST_ID));
                }
            }
            finally {
                progressLock.unlock();
            }
            shouldProgressExit.set(true);
            return null;
        });
        intermediateProgressTask.get();
        mainTask.get();
        return reportedValues;
    }

    private static interface MetricMutator {
        public void mutate();
    }
}

