/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.control;

import java.util.ArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.fn.harness.control.FinalizeBundleHandler;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.StringContains;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class FinalizeBundleHandlerTest {
    private static final String INSTRUCTION_ID = "instructionId";
    private static final BeamFnApi.InstructionResponse SUCCESSFUL_RESPONSE = BeamFnApi.InstructionResponse.newBuilder().setFinalizeBundle(BeamFnApi.FinalizeBundleResponse.getDefaultInstance()).build();

    @Test
    public void testRegistrationAndCallback() throws Exception {
        AtomicBoolean wasCalled1 = new AtomicBoolean();
        AtomicBoolean wasCalled2 = new AtomicBoolean();
        ArrayList<FinalizeBundleHandler.CallbackRegistration> callbacks = new ArrayList<FinalizeBundleHandler.CallbackRegistration>();
        callbacks.add(FinalizeBundleHandler.CallbackRegistration.create((Instant)Instant.now().plus((ReadableDuration)Duration.standardHours((long)1L)), () -> wasCalled1.set(true)));
        callbacks.add(FinalizeBundleHandler.CallbackRegistration.create((Instant)Instant.now().plus((ReadableDuration)Duration.standardHours((long)1L)), () -> wasCalled2.set(true)));
        FinalizeBundleHandler handler = new FinalizeBundleHandler(Executors.newCachedThreadPool());
        handler.registerCallbacks("test", callbacks);
        Assert.assertEquals(SUCCESSFUL_RESPONSE, handler.finalizeBundle(FinalizeBundleHandlerTest.requestFor("test")).build());
        Assert.assertTrue(wasCalled1.get());
        Assert.assertTrue(wasCalled2.get());
    }

    @Test
    public void testFinalizationIgnoresMissingBundleIds() throws Exception {
        FinalizeBundleHandler handler = new FinalizeBundleHandler(Executors.newCachedThreadPool());
        Assert.assertEquals(SUCCESSFUL_RESPONSE, handler.finalizeBundle(FinalizeBundleHandlerTest.requestFor("test")).build());
    }

    @Test
    public void testFinalizationContinuesToNextCallbackEvenInFailure() throws Exception {
        ArrayList<FinalizeBundleHandler.CallbackRegistration> callbacks = new ArrayList<FinalizeBundleHandler.CallbackRegistration>();
        AtomicBoolean wasCalled1 = new AtomicBoolean();
        AtomicBoolean wasCalled2 = new AtomicBoolean();
        callbacks.add(FinalizeBundleHandler.CallbackRegistration.create((Instant)Instant.now().plus((ReadableDuration)Duration.standardHours((long)1L)), () -> {
            wasCalled1.set(true);
            throw new Exception("testException1");
        }));
        callbacks.add(FinalizeBundleHandler.CallbackRegistration.create((Instant)Instant.now().plus((ReadableDuration)Duration.standardHours((long)1L)), () -> {
            wasCalled2.set(true);
            throw new Exception("testException2");
        }));
        FinalizeBundleHandler handler = new FinalizeBundleHandler(Executors.newCachedThreadPool());
        handler.registerCallbacks("test", callbacks);
        try {
            handler.finalizeBundle(FinalizeBundleHandlerTest.requestFor("test"));
            Assert.fail();
        }
        catch (Exception e) {
            MatcherAssert.assertThat(e.getMessage(), StringContains.containsString("Failed to handle bundle finalization for bundle"));
            Assert.assertEquals(2L, e.getSuppressed().length);
            Assert.assertTrue(wasCalled1.get());
            Assert.assertTrue(wasCalled2.get());
        }
    }

    private static BeamFnApi.InstructionRequest requestFor(String bundleId) {
        return BeamFnApi.InstructionRequest.newBuilder().setInstructionId(INSTRUCTION_ID).setFinalizeBundle(BeamFnApi.FinalizeBundleRequest.newBuilder().setInstructionId(bundleId).build()).build();
    }
}

