/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.module.extension.internal.runtime.source;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lock.LockFactory;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.scheduler.SchedulingStrategy;
import org.mule.runtime.api.store.ObjectStoreManager;
import org.mule.runtime.api.store.ObjectStoreSettings;
import org.mule.runtime.core.api.exception.SystemExceptionHandler;
import org.mule.runtime.core.api.util.ClassUtils;
import org.mule.runtime.core.api.util.UUID;
import org.mule.runtime.module.extension.internal.runtime.source.poll.PollingSourceWrapper;
import org.mule.sdk.api.runtime.operation.Result;
import org.mule.sdk.api.runtime.source.PollContext;
import org.mule.sdk.api.runtime.source.PollingSource;
import org.mule.sdk.api.runtime.source.SourceCallback;
import org.mule.tck.size.SmallTest;
import org.slf4j.Logger;

@SmallTest
@RunWith(value=MockitoJUnitRunner.class)
public class PollingSourceWrapperTestCase {
    public static final String TEST_FLOW_NAME = "myFlow";
    public static final String EXPECTED_WATERMARK_OS = "_pollingSource_myFlow/watermark";
    public static final String EXPECTED_RECENT_IDS_OS = "_pollingSource_myFlow/recently-processed-ids";
    public static final String EXPECTED_IDS_UPDATED_WATERMARK_OS = "_pollingSource_myFlow/ids-on-updated-watermark";
    public static final String EXPECTED_INFLIGHT_IDS_OS = "_pollingSource_myFlow/inflight-ids";
    private static final String POLL_ITEM_ID = UUID.getUUID().toString();
    @Mock(answer=Answers.RETURNS_DEEP_STUBS)
    private LockFactory lockFactoryMock;
    @Mock(answer=Answers.RETURNS_DEEP_STUBS)
    private ObjectStoreManager objectStoreManagerMock;
    @Mock(answer=Answers.RETURNS_DEEP_STUBS)
    private SchedulerService schedulerServiceMock;
    @Mock
    private ComponentLocation componentLocationMock;
    @Mock(answer=Answers.RETURNS_DEEP_STUBS)
    private SourceCallback callbackMock;
    private PollingSource pollingSource = (PollingSource)Mockito.mock(PollingSource.class);
    private SchedulingStrategy schedulingStrategy = (SchedulingStrategy)Mockito.mock(SchedulingStrategy.class);
    private Logger logger;
    private List<String> debugMessages;
    private List<String> traceMessages;
    @InjectMocks
    private PollingSourceWrapper<Object, Object> pollingSourceWrapper = new PollingSourceWrapper(this.pollingSource, this.schedulingStrategy, 4, (SystemExceptionHandler)Mockito.mock(SystemExceptionHandler.class));

    @Before
    public void setUp() throws Exception {
        Mockito.when((Object)this.componentLocationMock.getRootContainerName()).thenReturn((Object)TEST_FLOW_NAME);
        this.setComponentLocationMock();
        Mockito.when((Object)this.schedulingStrategy.schedule((Scheduler)ArgumentMatchers.any(), (Runnable)ArgumentMatchers.any())).thenAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                Runnable runnable = (Runnable)invocation.getArgument(1);
                runnable.run();
                return null;
            }
        });
        Mockito.when((Object)this.lockFactoryMock.createLock(ArgumentMatchers.anyString()).tryLock()).thenReturn((Object)true);
        this.logger = this.createMockLogger();
        this.debugMessages = new ArrayList<String>();
        this.traceMessages = new ArrayList<String>();
    }

    @Test
    public void waterMarkingStoresGetCreatedOnStart() throws MuleException {
        this.pollingSourceWrapper.onStart(this.callbackMock);
        this.assertPersistentStoreIsCreated(EXPECTED_WATERMARK_OS, ObjectStoreSettings.DEFAULT_EXPIRATION_INTERVAL);
        this.assertPersistentStoreIsCreated(EXPECTED_RECENT_IDS_OS, ObjectStoreSettings.DEFAULT_EXPIRATION_INTERVAL);
        this.assertPersistentStoreIsCreated(EXPECTED_IDS_UPDATED_WATERMARK_OS, ObjectStoreSettings.DEFAULT_EXPIRATION_INTERVAL);
    }

    @Test
    public void idempotencyStoreGetsCreatedOnStart() throws MuleException {
        this.pollingSourceWrapper.onStart(this.callbackMock);
        this.assertTransientStoreIsCreated(EXPECTED_INFLIGHT_IDS_OS, ObjectStoreSettings.DEFAULT_EXPIRATION_INTERVAL);
    }

    @Test
    public void loggingOnAcceptedItem() throws MuleException, Exception {
        this.stubPollItem(Collections.singletonList(null), Collections.singletonList(null));
        this.startSourcePollWithMockedLogger();
        this.verifyLogMessage(this.debugMessages, "Item with id:[%s] is accepted", "");
    }

    @Test
    public void loggingOnRejectedItem() throws Exception {
        Mockito.when((Object)this.lockFactoryMock.createLock(ArgumentMatchers.anyString()).tryLock()).thenReturn((Object)false);
        this.stubPollItem(Collections.singletonList(POLL_ITEM_ID), Collections.singletonList(null));
        this.startSourcePollWithMockedLogger();
        this.verifyLogMessage(this.debugMessages, "Item with id:[%s] is rejected with status:[%s]", POLL_ITEM_ID, PollContext.PollItemStatus.ALREADY_IN_PROCESS);
    }

    @Test
    public void loggingOnCreatedWatermark() throws Exception {
        String watermark = "5";
        this.stubPollItem(Collections.singletonList(POLL_ITEM_ID), Collections.singletonList(watermark));
        this.startSourcePollWithMockedLogger();
        this.verifyLogMessage(this.traceMessages, "Watermark with key:[{}] and value:[{}] saved to the ObjectStore for flow:[{}]", "watermark", watermark, TEST_FLOW_NAME);
        this.verifyLogMessage(this.traceMessages, "Watermark with key:[{}] and value:[{}] saved to the ObjectStore for flow:[{}]", "updatedWatermark", watermark, TEST_FLOW_NAME);
    }

    @Test
    public void loggingOnUpdatedWatermark() throws Exception {
        List<String> ids = Arrays.asList("id1", "id2", "id3", "id4");
        List<Serializable> watermarks = Arrays.asList(1, 3, 5, 8);
        this.stubPollItem(ids, watermarks);
        this.startSourcePollWithMockedLogger();
        this.verifyLogMessage(this.traceMessages, "Watermark with key:[{}] and value:[{}] saved to the ObjectStore for flow:[{}]", "updatedWatermark", 1, TEST_FLOW_NAME);
        this.verifyLogMessage(this.traceMessages, "Watermark comparison of {}:[{}] with {}:[{}] for flow:[{}] returns:[{}]", "updatedWatermark", 1, "itemWatermark", 3, TEST_FLOW_NAME, -1);
        this.verifyLogMessage(this.traceMessages, "Watermark with key:[{}] and value:[{}] saved to the ObjectStore for flow:[{}]", "updatedWatermark", 3, TEST_FLOW_NAME);
        this.verifyLogMessage(this.traceMessages, "Watermark comparison of {}:[{}] with {}:[{}] for flow:[{}] returns:[{}]", "updatedWatermark", 3, "itemWatermark", 5, TEST_FLOW_NAME, -1);
        this.verifyLogMessage(this.traceMessages, "Watermark with key:[{}] and value:[{}] saved to the ObjectStore for flow:[{}]", "updatedWatermark", 5, TEST_FLOW_NAME);
        this.verifyLogMessage(this.traceMessages, "Watermark comparison of {}:[{}] with {}:[{}] for flow:[{}] returns:[{}]", "updatedWatermark", 5, "itemWatermark", 8, TEST_FLOW_NAME, -1);
        this.verifyLogMessage(this.traceMessages, "Watermark with key:[{}] and value:[{}] saved to the ObjectStore for flow:[{}]", "updatedWatermark", 8, TEST_FLOW_NAME);
        this.verifyLogMessage(this.traceMessages, "Watermark with key:[{}] and value:[{}] saved to the ObjectStore for flow:[{}]", "watermark", 8, TEST_FLOW_NAME);
    }

    @Test
    public void loggingOnUpdatedWatermarkWithPollLimit() throws MuleException, Exception {
        List<String> ids = Arrays.asList("id1", "id2", "id3", "id4", "id5");
        List<Serializable> watermarks = Arrays.asList(1, 3, 5, 8, 4);
        this.stubPollItem(ids, watermarks);
        this.startSourcePollWithMockedLogger();
        this.verifyLogMessage(this.traceMessages, "Watermark with key:[{}] and value:[{}] saved to the ObjectStore for flow:[{}]", "updatedWatermark", 1, TEST_FLOW_NAME);
        this.verifyLogMessage(this.traceMessages, "Watermark comparison of {}:[{}] with {}:[{}] for flow:[{}] returns:[{}]", "updatedWatermark", 1, "itemWatermark", 3, TEST_FLOW_NAME, -1);
        this.verifyLogMessage(this.traceMessages, "Watermark with key:[{}] and value:[{}] saved to the ObjectStore for flow:[{}]", "updatedWatermark", 3, TEST_FLOW_NAME);
        this.verifyLogMessage(this.traceMessages, "Watermark comparison of {}:[{}] with {}:[{}] for flow:[{}] returns:[{}]", "updatedWatermark", 3, "itemWatermark", 5, TEST_FLOW_NAME, -1);
        this.verifyLogMessage(this.traceMessages, "Watermark with key:[{}] and value:[{}] saved to the ObjectStore for flow:[{}]", "updatedWatermark", 5, TEST_FLOW_NAME);
        this.verifyLogMessage(this.traceMessages, "Watermark comparison of {}:[{}] with {}:[{}] for flow:[{}] returns:[{}]", "updatedWatermark", 5, "itemWatermark", 8, TEST_FLOW_NAME, -1);
        this.verifyLogMessage(this.traceMessages, "Watermark with key:[{}] and value:[{}] saved to the ObjectStore for flow:[{}]", "updatedWatermark", 8, TEST_FLOW_NAME);
        this.verifyLogMessage(this.traceMessages, "Watermark comparison of {}:[{}] with {}:[{}] for flow:[{}] returns:[{}]", "updatedWatermark", 8, "itemWatermark", 4, TEST_FLOW_NAME, 1);
        this.verifyLogMessage(this.traceMessages, "Watermark with key:[{}] and value:[{}] saved to the ObjectStore for flow:[{}]", "watermark", 4, TEST_FLOW_NAME);
    }

    private void assertPersistentStoreIsCreated(String expectedName, Long expirationInterval) {
        this.assertStoreIsCreated(expectedName, true, expirationInterval);
    }

    private void assertTransientStoreIsCreated(String expectedName, Long expirationInterval) {
        this.assertStoreIsCreated(expectedName, false, expirationInterval);
    }

    private void assertStoreIsCreated(String expectedName, boolean isPersistent, Long expirationInterval) {
        ArgumentCaptor settingsCaptor = ArgumentCaptor.forClass(ObjectStoreSettings.class);
        ((ObjectStoreManager)Mockito.verify((Object)this.objectStoreManagerMock)).getOrCreateObjectStore((String)ArgumentMatchers.eq((Object)expectedName), (ObjectStoreSettings)settingsCaptor.capture());
        ObjectStoreSettings watermarkSettings = (ObjectStoreSettings)settingsCaptor.getValue();
        MatcherAssert.assertThat((Object)watermarkSettings.isPersistent(), (Matcher)CoreMatchers.is((Object)isPersistent));
        MatcherAssert.assertThat((Object)watermarkSettings.getExpirationInterval(), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.equalTo((Object)expirationInterval)));
    }

    private void setComponentLocationMock() throws Exception {
        ClassUtils.setFieldValue(this.pollingSourceWrapper, (String)"componentLocation", (Object)this.componentLocationMock, (boolean)false);
    }

    private Logger createMockLogger() {
        Logger logger = (Logger)Mockito.mock(Logger.class);
        Answer answer = new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                String method = invocation.getMethod().getName();
                String message = (String)invocation.getArgument(0, String.class);
                Object[] messageArgs = Arrays.copyOfRange(invocation.getArguments(), 1, invocation.getArguments().length);
                if (method.equals("debug")) {
                    PollingSourceWrapperTestCase.this.debugMessages.add(PollingSourceWrapperTestCase.this.formatMessage(message, messageArgs));
                } else {
                    PollingSourceWrapperTestCase.this.traceMessages.add(PollingSourceWrapperTestCase.this.formatMessage(message, messageArgs));
                }
                return null;
            }
        };
        ((Logger)Mockito.doAnswer((Answer)answer).when((Object)logger)).debug(ArgumentMatchers.anyString(), ArgumentMatchers.any());
        ((Logger)Mockito.doAnswer((Answer)answer).when((Object)logger)).debug(ArgumentMatchers.anyString(), ArgumentMatchers.any(), ArgumentMatchers.any());
        ((Logger)Mockito.doAnswer((Answer)answer).when((Object)logger)).trace(ArgumentMatchers.anyString(), ArgumentMatchers.any(), ArgumentMatchers.any());
        ((Logger)Mockito.doAnswer((Answer)answer).when((Object)logger)).trace(ArgumentMatchers.anyString(), (Object[])ArgumentMatchers.any());
        return logger;
    }

    private String formatMessage(String message, Object ... args) {
        String newMessage = message.replaceAll("\\{\\}", "%s");
        return String.format(newMessage, args);
    }

    private void startSourcePollWithMockedLogger() throws Exception {
        Logger origLogger = this.setLogger(this.pollingSourceWrapper, "LOGGER", this.logger);
        try {
            this.pollingSourceWrapper.onStart(this.callbackMock);
        }
        finally {
            this.setLogger(this.pollingSourceWrapper, "LOGGER", origLogger);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Logger setLogger(Object object, String fieldName, Logger newLogger) throws Exception {
        Logger oldLogger;
        Field field = object.getClass().getDeclaredField(fieldName);
        field.setAccessible(true);
        try {
            Field modifiersField = Field.class.getDeclaredField("modifiers");
            modifiersField.setAccessible(true);
            modifiersField.setInt(field, field.getModifiers() & 0xFFFFFFEF);
            try {
                oldLogger = (Logger)field.get(null);
                field.set(null, newLogger);
            }
            finally {
                modifiersField.setInt(field, field.getModifiers());
                modifiersField.setAccessible(false);
            }
        }
        finally {
            field.setAccessible(false);
        }
        return oldLogger;
    }

    private void stubPollItem(final List<String> pollItemIds, final List<Serializable> pollItemWatermarks) {
        ((PollingSource)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                PollContext pollContext = (PollContext)invocation.getArgument(0, PollContext.class);
                for (int i = 0; i < pollItemIds.size(); ++i) {
                    String id = (String)pollItemIds.get(i);
                    Serializable watermark = (Serializable)pollItemWatermarks.get(i);
                    pollContext.accept(item -> {
                        if (id != null) {
                            ((PollContext.PollItem)item).setId(id);
                        }
                        if (watermark != null) {
                            ((PollContext.PollItem)item).setWatermark(watermark);
                        }
                        ((PollContext.PollItem)item).setResult(Result.builder().output((Object)"test").build());
                    });
                }
                return null;
            }
        }).when((Object)this.pollingSource)).poll((PollContext)ArgumentMatchers.any());
    }

    private void verifyLogMessage(List<String> messages, String expectedMessage, Object ... arguments) {
        MatcherAssert.assertThat(messages, (Matcher)CoreMatchers.hasItem((Object)this.formatMessage(expectedMessage, arguments)));
    }
}

