/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.debug;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nullable;
import junit.framework.TestCase;
import org.apache.beam.fn.harness.debug.ElementSample;
import org.apache.beam.fn.harness.debug.OutputSampler;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class OutputSamplerTest {
    public BeamFnApi.SampledElement encodeInt(Integer i) throws IOException {
        VarIntCoder coder = VarIntCoder.of();
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        coder.encode(i, (OutputStream)stream);
        return BeamFnApi.SampledElement.newBuilder().setElement(ByteString.copyFrom((byte[])stream.toByteArray())).build();
    }

    public BeamFnApi.SampledElement encodeGlobalWindowedInt(Integer i) throws IOException {
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)VarIntCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        coder.encode((Object)WindowedValue.valueInGlobalWindow((Object)i), (OutputStream)stream);
        return BeamFnApi.SampledElement.newBuilder().setElement(ByteString.copyFrom((byte[])stream.toByteArray())).build();
    }

    public BeamFnApi.SampledElement encodeException(Integer i, String error, String ptransformId, @Nullable String processBundleId) throws IOException {
        VarIntCoder coder = VarIntCoder.of();
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        coder.encode(i, (OutputStream)stream);
        BeamFnApi.SampledElement.Exception.Builder builder = BeamFnApi.SampledElement.Exception.newBuilder().setTransformId(ptransformId).setError(error);
        if (processBundleId != null) {
            builder.setInstructionId(processBundleId);
        }
        return BeamFnApi.SampledElement.newBuilder().setElement(ByteString.copyFrom((byte[])stream.toByteArray())).setException(builder).build();
    }

    @Test
    public void testSamplesFirstN() throws IOException {
        VarIntCoder coder = VarIntCoder.of();
        OutputSampler outputSampler = new OutputSampler((Coder)coder, 10, 10, false);
        for (int i = 0; i < 15; ++i) {
            outputSampler.sample(WindowedValue.valueInGlobalWindow((Object)i));
        }
        ArrayList<BeamFnApi.SampledElement> expected = new ArrayList<BeamFnApi.SampledElement>();
        for (int i = 0; i < 10; ++i) {
            expected.add(this.encodeInt(i));
        }
        List samples = outputSampler.samples();
        MatcherAssert.assertThat(samples, Matchers.containsInAnyOrder(expected.toArray()));
    }

    @Test
    public void testWindowedValueSample() throws IOException {
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)VarIntCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        OutputSampler outputSampler = new OutputSampler((Coder)coder, 10, 10, false);
        outputSampler.sample(WindowedValue.valueInGlobalWindow((Object)0));
        ImmutableList expected = ImmutableList.of((Object)this.encodeGlobalWindowedInt(0));
        List samples = outputSampler.samples();
        MatcherAssert.assertThat(samples, Matchers.containsInAnyOrder(expected.toArray()));
    }

    @Test
    public void testNonWindowedValueSample() throws IOException {
        VarIntCoder coder = VarIntCoder.of();
        OutputSampler outputSampler = new OutputSampler((Coder)coder, 10, 10, false);
        outputSampler.sample(WindowedValue.valueInGlobalWindow((Object)0));
        ImmutableList expected = ImmutableList.of((Object)this.encodeInt(0));
        List samples = outputSampler.samples();
        MatcherAssert.assertThat(samples, Matchers.containsInAnyOrder(expected.toArray()));
    }

    @Test
    public void testActsLikeCircularBuffer() throws IOException {
        VarIntCoder coder = VarIntCoder.of();
        OutputSampler outputSampler = new OutputSampler((Coder)coder, 5, 20, false);
        for (int i = 0; i < 100; ++i) {
            outputSampler.sample(WindowedValue.valueInGlobalWindow((Object)i));
        }
        ArrayList<BeamFnApi.SampledElement> expected = new ArrayList<BeamFnApi.SampledElement>();
        expected.add(this.encodeInt(19));
        expected.add(this.encodeInt(39));
        expected.add(this.encodeInt(59));
        expected.add(this.encodeInt(79));
        expected.add(this.encodeInt(99));
        List samples = outputSampler.samples();
        MatcherAssert.assertThat(samples, Matchers.containsInAnyOrder(expected.toArray()));
    }

    @Test
    public void testCanSampleExceptions() throws IOException {
        VarIntCoder coder = VarIntCoder.of();
        OutputSampler outputSampler = new OutputSampler((Coder)coder, 5, 20, false);
        WindowedValue windowedValue = WindowedValue.valueInGlobalWindow((Object)1);
        ElementSample elementSample = outputSampler.sample(windowedValue);
        RuntimeException exception = new RuntimeException("Test exception");
        String ptransformId = "ptransform";
        String processBundleId = "processBundle";
        outputSampler.exception(elementSample, (Exception)exception, ptransformId, processBundleId);
        ArrayList<BeamFnApi.SampledElement> expected = new ArrayList<BeamFnApi.SampledElement>();
        expected.add(this.encodeException(1, exception.toString(), ptransformId, processBundleId));
        List samples = outputSampler.samples();
        MatcherAssert.assertThat(samples, Matchers.containsInAnyOrder(expected.toArray()));
    }

    @Test
    public void testNoDuplicateExceptions() throws IOException {
        VarIntCoder coder = VarIntCoder.of();
        OutputSampler outputSampler = new OutputSampler((Coder)coder, 5, 20, false);
        ElementSample elementSampleA = outputSampler.sample(WindowedValue.valueInGlobalWindow((Object)1));
        ElementSample elementSampleB = outputSampler.sample(WindowedValue.valueInGlobalWindow((Object)2));
        RuntimeException exception = new RuntimeException("Test exception");
        String ptransformIdA = "ptransformA";
        String ptransformIdB = "ptransformB";
        String processBundleId = "processBundle";
        outputSampler.exception(elementSampleA, (Exception)exception, ptransformIdA, processBundleId);
        outputSampler.exception(elementSampleB, (Exception)exception, ptransformIdB, processBundleId);
        ArrayList<BeamFnApi.SampledElement> expected = new ArrayList<BeamFnApi.SampledElement>();
        expected.add(this.encodeException(1, exception.toString(), ptransformIdA, processBundleId));
        expected.add(this.encodeInt(2));
        List samples = outputSampler.samples();
        MatcherAssert.assertThat(samples, Matchers.containsInAnyOrder(expected.toArray()));
    }

    @Test
    public void testExceptionOnlySampledIfNonNullProcessBundle() throws IOException {
        VarIntCoder coder = VarIntCoder.of();
        OutputSampler outputSampler = new OutputSampler((Coder)coder, 5, 20, false);
        WindowedValue windowedValue = WindowedValue.valueInGlobalWindow((Object)1);
        ElementSample elementSample = outputSampler.sample(windowedValue);
        RuntimeException exception = new RuntimeException("Test exception");
        String ptransformId = "ptransform";
        outputSampler.exception(elementSample, (Exception)exception, ptransformId, null);
        ArrayList<BeamFnApi.SampledElement> expected = new ArrayList<BeamFnApi.SampledElement>();
        expected.add(this.encodeInt(1));
        List samples = outputSampler.samples();
        MatcherAssert.assertThat(samples, Matchers.containsInAnyOrder(expected.toArray()));
    }

    @Test
    public void testExceptionSamplesAreNotRemoved() throws IOException {
        VarIntCoder coder = VarIntCoder.of();
        OutputSampler outputSampler = new OutputSampler((Coder)coder, 5, 20, false);
        WindowedValue windowedValue = WindowedValue.valueInGlobalWindow((Object)0);
        ElementSample elementSample = outputSampler.sample(windowedValue);
        for (int i = 1; i < 100; ++i) {
            outputSampler.sample(WindowedValue.valueInGlobalWindow((Object)i));
        }
        RuntimeException exception = new RuntimeException("Test exception");
        String ptransformId = "ptransform";
        String processBundleId = "processBundle";
        outputSampler.exception(elementSample, (Exception)exception, ptransformId, processBundleId);
        ArrayList<BeamFnApi.SampledElement> expected = new ArrayList<BeamFnApi.SampledElement>();
        expected.add(this.encodeInt(19));
        expected.add(this.encodeInt(39));
        expected.add(this.encodeInt(59));
        expected.add(this.encodeInt(79));
        expected.add(this.encodeInt(99));
        expected.add(this.encodeException(0, exception.toString(), ptransformId, processBundleId));
        List samples = outputSampler.samples();
        MatcherAssert.assertThat(samples, Matchers.containsInAnyOrder(expected.toArray()));
    }

    @Test
    public void testOnlySampleExceptions() throws IOException {
        VarIntCoder coder = VarIntCoder.of();
        OutputSampler outputSampler = new OutputSampler((Coder)coder, 5, 20, true);
        WindowedValue windowedValue = WindowedValue.valueInGlobalWindow((Object)1);
        outputSampler.sample(WindowedValue.valueInGlobalWindow((Object)2));
        ElementSample elementSample = outputSampler.sample(windowedValue);
        RuntimeException exception = new RuntimeException("Test exception");
        String ptransformId = "ptransform";
        String processBundleId = "processBundle";
        outputSampler.exception(elementSample, (Exception)exception, ptransformId, processBundleId);
        ArrayList<BeamFnApi.SampledElement> expected = new ArrayList<BeamFnApi.SampledElement>();
        expected.add(this.encodeException(1, exception.toString(), ptransformId, processBundleId));
        List samples = outputSampler.samples();
        MatcherAssert.assertThat(samples, Matchers.containsInAnyOrder(expected.toArray()));
    }

    @Test
    public void testConcurrentSamples() throws IOException, InterruptedException {
        VarIntCoder coder = VarIntCoder.of();
        OutputSampler outputSampler = new OutputSampler((Coder)coder, 10, 2, false);
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(2);
        Thread sampleThreadA = new Thread(() -> {
            try {
                startSignal.await();
            }
            catch (InterruptedException e) {
                return;
            }
            for (int i = 0; i < 1000000; ++i) {
                ElementSample sample = outputSampler.sample(WindowedValue.valueInGlobalWindow((Object)i));
                outputSampler.exception(sample, (Exception)new RuntimeException(""), "ptransformId", "pbId");
            }
            doneSignal.countDown();
        });
        Thread sampleThreadB = new Thread(() -> {
            try {
                startSignal.await();
            }
            catch (InterruptedException e) {
                return;
            }
            for (int i = -1000000; i < 0; ++i) {
                ElementSample sample = outputSampler.sample(WindowedValue.valueInGlobalWindow((Object)i));
                outputSampler.exception(sample, (Exception)new RuntimeException(""), "ptransformId", "pbId");
            }
            doneSignal.countDown();
        });
        sampleThreadA.start();
        sampleThreadB.start();
        startSignal.countDown();
        ArrayList samples = new ArrayList();
        while (doneSignal.getCount() > 0L) {
            samples.addAll(outputSampler.samples());
        }
        sampleThreadA.join();
        sampleThreadB.join();
        ArrayList<Integer> samplesFromThreadA = new ArrayList<Integer>();
        ArrayList<Integer> samplesFromThreadB = new ArrayList<Integer>();
        for (BeamFnApi.SampledElement sampledElement : samples) {
            int el = coder.decode(sampledElement.getElement().newInput());
            if (el >= 0) {
                samplesFromThreadA.add(el);
                continue;
            }
            samplesFromThreadB.add(el);
        }
        ArrayList sortedSamplesFromThreadA = new ArrayList(samplesFromThreadA);
        ArrayList sortedSamplesFromThreadB = new ArrayList(samplesFromThreadB);
        Collections.sort(sortedSamplesFromThreadA);
        Collections.sort(sortedSamplesFromThreadB);
        TestCase.assertEquals(samplesFromThreadA, sortedSamplesFromThreadA);
        TestCase.assertEquals(samplesFromThreadB, sortedSamplesFromThreadB);
    }
}

