/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.module.extension.internal.runtime.source.poll;

import java.io.Serializable;
import java.time.ZonedDateTime;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import javax.inject.Inject;
import javax.inject.Named;
import org.mule.runtime.api.component.execution.CompletableCallback;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lock.LockFactory;
import org.mule.runtime.api.notification.NotificationDispatcher;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.scheduler.SchedulingStrategy;
import org.mule.runtime.api.store.ObjectStore;
import org.mule.runtime.api.store.ObjectStoreException;
import org.mule.runtime.api.store.ObjectStoreManager;
import org.mule.runtime.api.store.ObjectStoreSettings;
import org.mule.runtime.api.util.Preconditions;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.exception.SystemExceptionHandler;
import org.mule.runtime.core.api.util.func.CheckedRunnable;
import org.mule.runtime.core.internal.util.ConcurrencyUtils;
import org.mule.runtime.module.extension.internal.runtime.source.SourceCallbackContextAdapter;
import org.mule.runtime.module.extension.internal.runtime.source.SourceWrapper;
import org.mule.runtime.module.extension.internal.runtime.source.poll.DelegateRunnable;
import org.mule.runtime.module.extension.internal.runtime.source.poll.RestartContext;
import org.mule.runtime.module.extension.internal.runtime.source.poll.Restartable;
import org.mule.runtime.module.extension.internal.runtime.source.poll.WatermarkStatus;
import org.mule.sdk.api.runtime.operation.Result;
import org.mule.sdk.api.runtime.source.PollContext;
import org.mule.sdk.api.runtime.source.PollingSource;
import org.mule.sdk.api.runtime.source.SourceCallback;
import org.mule.sdk.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PollingSourceWrapper<T, A>
extends SourceWrapper<T, A>
implements Restartable {
    public static final String ACCEPTED_POLL_ITEM_INFORMATION = "mule-polling-source-accepted-poll-item-information";
    public static final String REJECTED_ITEM_MESSAGE = "Item with id:[{}] is rejected with status:[{}]";
    public static final String ACCEPTED_ITEM_MESSAGE = "Item with id:[{}] is accepted";
    public static final String WATERMARK_SAVED_MESSAGE = "Watermark with key:[{}] and value:[{}] saved to the ObjectStore for flow:[{}]";
    public static final String WATERMARK_RETURNED_MESSAGE = "Watermark with key:[{}] and value:[{}] returned from the ObjectStore for flow:[{}]";
    public static final String WATERMARK_NOT_RETURNED_MESSAGE = "Watermark with key:[{}] not found on the ObjectStore for flow:[{}]";
    public static final String WATERMARK_REMOVED_MESSAGE = "Watermark with key:[{}] removed from the ObjectStore for flow:[{}]";
    public static final String WATERMARK_COMPARISON_MESSAGE = "Watermark comparison of {}:[{}] with {}:[{}] for flow:[{}] returns:[{}]";
    private static final Logger LOGGER = LoggerFactory.getLogger(PollingSourceWrapper.class);
    private static final String ITEM_RELEASER_CTX_VAR = "itemReleaser";
    private static final String UPDATE_PROCESSED_LOCK = "OSClearing";
    private static final String INFLIGHT_IDS_OS_NAME_SUFFIX = "inflight-ids";
    private final PollingSource<T, A> delegate;
    private final SchedulingStrategy scheduler;
    private final int maxItemsPerPoll;
    private final SystemExceptionHandler systemExceptionHandler;
    @Inject
    private LockFactory lockFactory;
    @Inject
    @Named(value="_muleObjectStoreManager")
    private ObjectStoreManager objectStoreManager;
    @Inject
    private SchedulerService schedulerService;
    @Inject
    private NotificationDispatcher notificationDispatcher;
    private ObjectStore<Serializable> watermarkObjectStore;
    private ObjectStore<Serializable> inflightIdsObjectStore;
    private ObjectStore<Serializable> recentlyProcessedIds;
    private ObjectStore<Serializable> idsOnUpdatedWatermark;
    private ComponentLocation componentLocation;
    private String flowName;
    private final AtomicBoolean stopRequested = new AtomicBoolean(false);
    private Scheduler executor;
    private AtomicBoolean restarting = new AtomicBoolean(false);
    private DelegateRunnable delegateRunnable;

    public PollingSourceWrapper(PollingSource<T, A> delegate, SchedulingStrategy scheduler, int maxItemsPerPoll, SystemExceptionHandler systemExceptionHandler) {
        super(delegate);
        this.delegate = delegate;
        this.scheduler = scheduler;
        this.maxItemsPerPoll = maxItemsPerPoll;
        this.systemExceptionHandler = systemExceptionHandler;
    }

    public void onStart(SourceCallback<T, A> sourceCallback) throws MuleException {
        this.delegate.onStart(sourceCallback);
        this.flowName = this.componentLocation.getRootContainerName();
        this.inflightIdsObjectStore = this.objectStoreManager.getOrCreateObjectStore(this.formatKey(INFLIGHT_IDS_OS_NAME_SUFFIX), ObjectStoreSettings.unmanagedTransient());
        this.recentlyProcessedIds = this.objectStoreManager.getOrCreateObjectStore(this.formatKey("recently-processed-ids"), ObjectStoreSettings.unmanagedPersistent());
        this.idsOnUpdatedWatermark = this.objectStoreManager.getOrCreateObjectStore(this.formatKey("ids-on-updated-watermark"), ObjectStoreSettings.unmanagedPersistent());
        this.watermarkObjectStore = this.objectStoreManager.getOrCreateObjectStore(this.formatKey("watermark"), ObjectStoreSettings.unmanagedPersistent());
        this.stopRequested.set(false);
        if (this.restarting.compareAndSet(true, false)) {
            this.poll(sourceCallback);
            this.delegateRunnable.setDelegate(() -> this.poll(sourceCallback));
        } else {
            this.executor = this.schedulerService.customScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1).withWaitAllowed(true).withName(this.formatKey("executor")));
            this.delegateRunnable = new DelegateRunnable(() -> this.poll(sourceCallback));
            this.scheduler.schedule(this.executor, (Runnable)this.delegateRunnable);
        }
    }

    private String formatKey(String key) {
        return String.format("_pollingSource_%s/%s", this.flowName, key);
    }

    public void onStop() {
        this.stopRequested.set(true);
        if (!this.restarting.get()) {
            this.shutdownScheduler();
            this.delegateRunnable = null;
        }
        try {
            this.delegate.onStop();
        }
        catch (Throwable t) {
            LOGGER.error(String.format("Found error while stopping source at location '%s'. %s", this.flowName, t.getMessage()), t);
        }
    }

    @Override
    public void onTerminate(CoreEvent event, Map<String, Object> parameters, SourceCallbackContext context, CompletableCallback<Void> callback) {
        this.releaseOnCallback(context, callback);
    }

    @Override
    public void onBackPressure(CoreEvent event, Map<String, Object> parameters, SourceCallbackContext context, CompletableCallback<Void> callback) {
        this.releaseOnCallback(context, callback);
    }

    private void releaseOnCallback(SourceCallbackContext context, CompletableCallback<Void> callback) {
        this.release(context);
        callback.complete(null);
    }

    private void poll(SourceCallback<T, A> sourceCallback) {
        if (this.isRequestedToStop()) {
            return;
        }
        this.withWatermarkLock(() -> {
            DefaultPollContext pollContext = new DefaultPollContext(sourceCallback, this.getCurrentWatermark(), this.getUpdatedWatermark());
            try {
                this.delegate.poll((PollContext)pollContext);
            }
            catch (RuntimeException e) {
                LOGGER.error(String.format("Found exception trying to process item on source at flow '%s'. %s", this.flowName, e.getMessage()), (Throwable)e);
                this.systemExceptionHandler.handleException((Exception)e, this.componentLocation);
                return;
            }
            try {
                if (!this.isRequestedToStop()) {
                    pollContext.getUpdatedWatermark().ifPresent(w -> this.updateWatermark((Serializable)w, pollContext.getWatermarkComparator(), pollContext.getMinimumRejectedByLimitPassingWatermark().orElse(null)));
                }
            }
            catch (Throwable t) {
                LOGGER.error(String.format("Found exception trying to process item on source at flow '%s'. %s", this.flowName, t.getMessage()), t);
            }
        });
    }

    private int compareWatermarks(String w1Label, Serializable w1, String w2Label, Serializable w2, Comparator comparator) throws IllegalArgumentException {
        if (comparator == null) {
            if (w1 instanceof Serializable && w2 instanceof Serializable) {
                comparator = Comparator.naturalOrder();
            } else {
                throw new IllegalStateException(String.format("Non comparable watermark values [%s, %s] were provided on source at flow '%s'. Use comparable values or set a custom comparator. Watermark not updated.", w1, w2, this.flowName));
            }
        }
        int result = comparator.compare(w1, w2);
        LOGGER.trace(WATERMARK_COMPARISON_MESSAGE, new Object[]{w1Label, w1, w2Label, w2, this.flowName, result});
        return result;
    }

    @Override
    public RestartContext beginRestart() {
        this.restarting.set(true);
        this.delegateRunnable.setDelegate(null);
        return new RestartContext(this.executor, this.delegateRunnable);
    }

    @Override
    public void finishRestart(RestartContext restartContext) {
        this.restarting.set(true);
        this.executor = restartContext.getExecutor();
        this.delegateRunnable = restartContext.getDelegateRunnable();
    }

    private String getItemId(DefaultPollItem pollItem) {
        return pollItem.getItemId().orElseGet(() -> pollItem.getResult().getAttributes().map(Object::toString).orElse(""));
    }

    private void rejectItem(Result<T, A> result, SourceCallbackContext context) {
        try {
            this.delegate.onRejectedItem(result, context);
        }
        finally {
            this.release(context);
            if (context instanceof SourceCallbackContextAdapter) {
                ((SourceCallbackContextAdapter)context).releaseConnection();
            }
        }
    }

    private void release(SourceCallbackContext context) {
        context.getVariable(ITEM_RELEASER_CTX_VAR).ifPresent(ItemReleaser::release);
    }

    private void withWatermarkLock(CheckedRunnable runnable) {
        Lock lock = this.getWatermarkLock();
        lock.lock();
        try {
            runnable.run();
        }
        finally {
            ConcurrencyUtils.safeUnlock((Lock)lock);
        }
    }

    private Lock getWatermarkLock() {
        return this.lockFactory.createLock(this.formatKey("watermark"));
    }

    private void updateWatermark(Serializable value, Comparator comparator, Serializable minimumRejectedByLimitPassingWatermark) {
        try {
            if (minimumRejectedByLimitPassingWatermark != null) {
                LOGGER.debug("During the poll in the flow {}, items were rejected due to the item limit, a lower watermark than the maximum found willhave to be the new current watermark to ensure that those items are not left without being processed.", (Object)this.flowName);
                this.setCurrentWatermarkAsMinimumRejectWatermark(minimumRejectedByLimitPassingWatermark);
            } else {
                this.updateWatermark(value, comparator);
            }
        }
        catch (ObjectStoreException e) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)String.format("Failed to update watermark value for message source at location '%s'. %s", this.flowName, e.getMessage())), (Throwable)e);
        }
    }

    private void updateWatermark(Serializable value, Comparator comparator) throws ObjectStoreException {
        if (this.watermarkObjectStore.contains("watermark")) {
            Serializable currentValue = this.watermarkObjectStore.retrieve("watermark");
            if (this.compareWatermarks("currentValue", currentValue, "value", value, comparator) >= 0) {
                return;
            }
            this.watermarkObjectStore.remove("watermark");
        }
        this.updateRecentlyProcessedIds();
        this.saveWatermark("watermark", value);
    }

    private void setCurrentWatermarkAsMinimumRejectWatermark(Serializable minimumRejectedByLimitPassingWatermark) throws ObjectStoreException {
        this.removeWatermark("watermark");
        this.saveWatermark("watermark", minimumRejectedByLimitPassingWatermark);
    }

    private void updateRecentlyProcessedIds() throws ObjectStoreException {
        Lock osClearingLock = this.lockFactory.createLock(UPDATE_PROCESSED_LOCK);
        try {
            osClearingLock.lock();
            List strings = this.idsOnUpdatedWatermark.allKeys();
            this.recentlyProcessedIds.clear();
            strings.forEach(key -> {
                try {
                    this.recentlyProcessedIds.store(key, this.idsOnUpdatedWatermark.retrieve(key));
                }
                catch (ObjectStoreException e) {
                    throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)"An error occurred while updating the watermark Ids. Failed to update key '%s' in Watermark-IDs ObjectStore: %s", (Object[])new Object[]{key, e.getMessage()}), (Throwable)e);
                }
            });
            this.idsOnUpdatedWatermark.clear();
        }
        finally {
            ConcurrencyUtils.safeUnlock((Lock)osClearingLock);
        }
    }

    private Serializable getWatermark(String watermarkKey) {
        try {
            if (this.watermarkObjectStore.contains(watermarkKey)) {
                Serializable watermark = this.watermarkObjectStore.retrieve(watermarkKey);
                LOGGER.trace(WATERMARK_RETURNED_MESSAGE, new Object[]{watermarkKey, watermark, this.flowName});
                return watermark;
            }
            LOGGER.trace(WATERMARK_NOT_RETURNED_MESSAGE, (Object)watermarkKey, (Object)this.flowName);
            return null;
        }
        catch (ObjectStoreException e) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)String.format("Failed to fetch watermark for Message source at location '%s'. %s", this.flowName, e.getMessage())), (Throwable)e);
        }
    }

    private void saveWatermark(String watermarkKey, Serializable watermarkValue) throws ObjectStoreException {
        this.watermarkObjectStore.store(watermarkKey, watermarkValue);
        LOGGER.trace(WATERMARK_SAVED_MESSAGE, new Object[]{watermarkKey, watermarkValue, this.flowName});
    }

    private void removeWatermark(String watermarkKey) throws ObjectStoreException {
        if (this.watermarkObjectStore.contains(watermarkKey)) {
            this.watermarkObjectStore.remove(watermarkKey);
            LOGGER.trace(WATERMARK_REMOVED_MESSAGE, (Object)watermarkKey, (Object)this.flowName);
        }
    }

    private Serializable getCurrentWatermark() {
        return this.getWatermark("watermark");
    }

    private Serializable getUpdatedWatermark() {
        return this.getWatermark("updatedWatermark");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean acquireItem(DefaultPollItem pollItem, SourceCallbackContext callbackContext) {
        if (!pollItem.getItemId().isPresent()) {
            return true;
        }
        String id = pollItem.getItemId().get();
        Lock lock = this.lockFactory.createLock(this.flowName + "/" + id);
        if (!lock.tryLock()) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Source at flow '{}' is skipping processing of item '{}' because another thread or node already has a mule lock on it", (Object)this.flowName, (Object)id);
            }
            return false;
        }
        try {
            if (this.inflightIdsObjectStore.contains(id)) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Source at flow '{}' polled item '{}', but skipping it since it is already being processed in another thread or node", (Object)this.flowName, (Object)id);
                }
                boolean bl = false;
                return bl;
            }
            this.inflightIdsObjectStore.store(id, (Serializable)((Object)id));
            callbackContext.addVariable(ITEM_RELEASER_CTX_VAR, (Object)new ItemReleaser(id));
            boolean bl = true;
            return bl;
        }
        catch (Exception e) {
            LOGGER.error(String.format("Could not guarantee idempotency for item '%s' for source at flow '%s'. '%s", id, this.flowName, e.getMessage()), (Throwable)e);
            boolean bl = false;
            return bl;
        }
        finally {
            ConcurrencyUtils.safeUnlock((Lock)lock);
        }
    }

    private boolean isRequestedToStop() {
        return this.stopRequested.get() || Thread.currentThread().isInterrupted();
    }

    private void shutdownScheduler() {
        if (this.executor != null) {
            this.executor.stop();
            this.executor = null;
        }
    }

    private int statusToNotificationType(PollContext.PollItemStatus status, boolean currentPollItemLimitApplied) {
        switch (status) {
            case ACCEPTED: {
                return 2304;
            }
            case FILTERED_BY_WATERMARK: {
                return 2307;
            }
            case ALREADY_IN_PROCESS: {
                return 2306;
            }
            case SOURCE_STOPPING: {
                return 2305;
            }
        }
        return Integer.MIN_VALUE;
    }

    private class DefaultPollItem
    implements PollContext.PollItem<T, A> {
        private final SourceCallbackContext sourceCallbackContext;
        private Result<T, A> result;
        private Serializable watermark;
        private String itemId;

        private DefaultPollItem(SourceCallbackContext sourceCallbackContext) {
            this.sourceCallbackContext = sourceCallbackContext;
        }

        public SourceCallbackContext getSourceCallbackContext() {
            return this.sourceCallbackContext;
        }

        public PollContext.PollItem<T, A> setResult(Result<T, A> result) {
            Preconditions.checkArgument((result != null ? 1 : 0) != 0, (String)"Cannot set a null Result");
            this.result = result;
            return this;
        }

        public PollContext.PollItem<T, A> setWatermark(Serializable watermark) {
            Preconditions.checkArgument((watermark != null ? 1 : 0) != 0, (String)"Cannot set a null watermark");
            this.watermark = watermark;
            return this;
        }

        public PollContext.PollItem<T, A> setId(String id) {
            Preconditions.checkArgument((id != null ? 1 : 0) != 0, (String)"Cannot set a null id");
            this.itemId = id;
            return this;
        }

        private Optional<Serializable> getWatermark() {
            return Optional.ofNullable(this.watermark);
        }

        private Optional<String> getItemId() {
            return Optional.ofNullable(this.itemId);
        }

        private Result<T, A> getResult() {
            return this.result;
        }

        private void validate() {
            if (this.result == null) {
                throw new IllegalStateException(String.format("Missing item Result. Source in flow '%s' pushed an item with ID '%s' without configuring its Result", PollingSourceWrapper.this.flowName, this.itemId));
            }
        }
    }

    private class ItemReleaser {
        private final String id;

        private ItemReleaser(String id) {
            this.id = id;
        }

        private void release() {
            try {
                if (PollingSourceWrapper.this.inflightIdsObjectStore.contains(this.id)) {
                    PollingSourceWrapper.this.inflightIdsObjectStore.remove(this.id);
                }
            }
            catch (ObjectStoreException e) {
                LOGGER.error(String.format("Could not untrack item '%s' in source at flow '%s'. %s", this.id, PollingSourceWrapper.this.flowName, e.getMessage()), (Throwable)e);
            }
        }
    }

    private class DefaultPollContext
    implements PollContext<T, A> {
        private final SourceCallback<T, A> sourceCallback;
        private Serializable currentWatermark;
        private Serializable updatedWatermark;
        private Serializable minimumRejectedByLimitPassingWatermark;
        private Comparator<Serializable> watermarkComparator = null;
        private ZonedDateTime timestamp;
        private int currentPollItems;

        private DefaultPollContext(SourceCallback<T, A> sourceCallback, Serializable currentWatermark, Serializable updatedWatermark) {
            this.sourceCallback = sourceCallback;
            this.currentWatermark = currentWatermark;
            this.updatedWatermark = updatedWatermark;
            this.currentPollItems = 0;
            this.minimumRejectedByLimitPassingWatermark = null;
            this.timestamp = ZonedDateTime.now();
        }

        public String getPollId() {
            return PollingSourceWrapper.this.componentLocation.getRootContainerName() + " @ " + this.timestamp;
        }

        public PollContext.PollItemStatus accept(Consumer<PollContext.PollItem<T, A>> consumer) {
            SourceCallbackContext callbackContext = this.sourceCallback.createContext();
            DefaultPollItem pollItem = new DefaultPollItem(callbackContext);
            consumer.accept(pollItem);
            pollItem.validate();
            String itemId = PollingSourceWrapper.this.getItemId(pollItem);
            PollContext.PollItemStatus status = PollContext.PollItemStatus.ACCEPTED;
            boolean currentPollItemLimitApplied = false;
            if (PollingSourceWrapper.this.isRequestedToStop()) {
                status = PollContext.PollItemStatus.SOURCE_STOPPING;
            } else if (!PollingSourceWrapper.this.acquireItem(pollItem, callbackContext)) {
                status = PollContext.PollItemStatus.ALREADY_IN_PROCESS;
            } else {
                WatermarkStatus watermarkStatus = this.passesWatermark(pollItem);
                if (watermarkStatus == WatermarkStatus.REJECT) {
                    status = PollContext.PollItemStatus.FILTERED_BY_WATERMARK;
                } else if (this.currentPollItems < PollingSourceWrapper.this.maxItemsPerPoll) {
                    ++this.currentPollItems;
                    this.sourceCallback.handle(pollItem.getResult(), callbackContext);
                    this.saveWatermarkValue(watermarkStatus, pollItem);
                } else {
                    currentPollItemLimitApplied = true;
                    this.processLimitApplied(watermarkStatus, pollItem);
                }
            }
            if (status != PollContext.PollItemStatus.ACCEPTED || currentPollItemLimitApplied) {
                LOGGER.debug(PollingSourceWrapper.REJECTED_ITEM_MESSAGE, (Object)itemId, (Object)status);
                PollingSourceWrapper.this.rejectItem(pollItem.getResult(), callbackContext);
            } else {
                LOGGER.debug(PollingSourceWrapper.ACCEPTED_ITEM_MESSAGE, (Object)itemId);
            }
            return status;
        }

        private void processLimitApplied(WatermarkStatus watermarkStatus, DefaultPollItem pollItem) {
            Serializable itemWatermark = pollItem.getWatermark().orElse(null);
            if (itemWatermark == null || watermarkStatus != WatermarkStatus.PASSED) {
                return;
            }
            if (this.minimumRejectedByLimitPassingWatermark == null || PollingSourceWrapper.this.compareWatermarks("itemWatermark", itemWatermark, "minimumRejectedByLimitPassingWatermark", this.minimumRejectedByLimitPassingWatermark, this.watermarkComparator) < 0) {
                LOGGER.debug("An item that passed all previous validations is being rejected by the poll limit and its watermarkvalue will be stored so that is processed on future polls if sent for processing.");
                this.minimumRejectedByLimitPassingWatermark = itemWatermark;
            }
        }

        private void saveWatermarkValue(WatermarkStatus watermarkStatus, DefaultPollItem pollItem) {
            String itemId = pollItem.getItemId().orElse(null);
            Serializable itemWatermark = pollItem.getWatermark().orElse(null);
            if (itemWatermark == null) {
                return;
            }
            switch (watermarkStatus) {
                case ON_NEW_HIGH: {
                    this.renewUpdatedWatermark(itemWatermark);
                    LOGGER.debug("A new watermark maximum has been found when processing item with id {} for source in flow {}", (Object)itemId, (Object)PollingSourceWrapper.this.flowName);
                }
                case ON_HIGH: {
                    this.addToUpdatedWatermark(itemId, itemWatermark);
                    LOGGER.debug("Watermark value for item with id {} is equal to the maximum value found for source in flow {}", (Object)itemId, (Object)PollingSourceWrapper.this.flowName);
                }
                case PASSED: {
                    this.addToRecentlyProcessedIds(itemId, itemWatermark);
                    LOGGER.debug("Item with id {} passed the watermark validation and will be processed in flow {}", (Object)itemId, (Object)PollingSourceWrapper.this.flowName);
                }
            }
        }

        private void renewUpdatedWatermark(Serializable itemWatermark) {
            try {
                PollingSourceWrapper.this.idsOnUpdatedWatermark.clear();
                this.updatedWatermark = itemWatermark;
                PollingSourceWrapper.this.removeWatermark("updatedWatermark");
                PollingSourceWrapper.this.saveWatermark("updatedWatermark", this.updatedWatermark);
            }
            catch (ObjectStoreException e) {
                throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)"An error occurred while trying to update the updatedWatermark in the the object store"), (Throwable)e);
            }
        }

        private void addToUpdatedWatermark(String itemId, Serializable itemWatermark) {
            if (itemId != null) {
                try {
                    PollingSourceWrapper.this.idsOnUpdatedWatermark.store(itemId, itemWatermark);
                }
                catch (ObjectStoreException e) {
                    throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)"An error occurred while updating the watermark for Item with ID [%s]", (Object[])new Object[]{itemId}), (Throwable)e);
                }
            }
        }

        private void addToRecentlyProcessedIds(String itemId, Serializable itemWatermark) {
            try {
                if (itemId != null) {
                    if (PollingSourceWrapper.this.recentlyProcessedIds.contains(itemId)) {
                        PollingSourceWrapper.this.recentlyProcessedIds.remove(itemId);
                    }
                    PollingSourceWrapper.this.recentlyProcessedIds.store(itemId, itemWatermark);
                }
            }
            catch (ObjectStoreException e) {
                throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)"An error occurred while updating the watermark for Item with ID [%s]", (Object[])new Object[]{itemId}), (Throwable)e);
            }
        }

        public boolean isSourceStopping() {
            return PollingSourceWrapper.this.isRequestedToStop();
        }

        public Optional<Serializable> getWatermark() {
            return Optional.ofNullable(this.currentWatermark);
        }

        public void setWatermarkComparator(Comparator<? extends Serializable> comparator) {
            Preconditions.checkArgument((comparator != null ? 1 : 0) != 0, (String)"Cannot set a null watermark comparator");
            this.watermarkComparator = comparator;
        }

        public void onConnectionException(ConnectionException e) {
            this.sourceCallback.onConnectionException(e);
        }

        public Optional<Serializable> getMinimumRejectedByLimitPassingWatermark() {
            return Optional.ofNullable(this.minimumRejectedByLimitPassingWatermark);
        }

        private Optional<Serializable> getUpdatedWatermark() {
            return Optional.ofNullable(this.updatedWatermark);
        }

        private Comparator<Serializable> getWatermarkComparator() {
            return this.watermarkComparator;
        }

        /*
         * Unable to fully structure code
         */
        private WatermarkStatus passesWatermark(DefaultPollItem pollItem) {
            itemWatermark = pollItem.getWatermark().orElse(null);
            if (itemWatermark == null) {
                return WatermarkStatus.PASSED;
            }
            itemId = pollItem.getItemId().orElse(null);
            status = WatermarkStatus.PASSED;
            if (this.currentWatermark == null && this.updatedWatermark == null) {
                status = WatermarkStatus.ON_NEW_HIGH;
            } else {
                v0 = compare = this.currentWatermark != null ? PollingSourceWrapper.this.compareWatermarks("currentWatermark", this.currentWatermark, "itemWatermark", itemWatermark, this.watermarkComparator) : -1;
                if (compare < 0) {
                    try {
                        if (itemId != null && PollingSourceWrapper.this.recentlyProcessedIds.contains(itemId) && PollingSourceWrapper.this.compareWatermarks("itemWatermark", itemWatermark, "previousItemWatermark", previousItemWatermark = PollingSourceWrapper.this.recentlyProcessedIds.retrieve(itemId), this.watermarkComparator) <= 0) {
                            status = WatermarkStatus.REJECT;
                        }
                        if (status == WatermarkStatus.REJECT) ** GOTO lbl31
                        v1 = updatedWatermarkCompare = this.updatedWatermark != null ? PollingSourceWrapper.this.compareWatermarks("updatedWatermark", this.updatedWatermark, "itemWatermark", itemWatermark, this.watermarkComparator) : -1;
                        if (updatedWatermarkCompare == 0) {
                            status = WatermarkStatus.ON_HIGH;
                        }
                        if (updatedWatermarkCompare >= 0) ** GOTO lbl31
                        status = WatermarkStatus.ON_NEW_HIGH;
                    }
                    catch (ObjectStoreException e) {
                        throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)"An error occurred while checking the previous watermark for an item id that was recently processed. Item with ID [%s]", (Object[])new Object[]{itemId}), (Throwable)e);
                    }
                } else if (compare == 0 && pollItem.getItemId().isPresent()) {
                    try {
                        status = PollingSourceWrapper.this.recentlyProcessedIds.contains(itemId) != false ? WatermarkStatus.REJECT : WatermarkStatus.PASSED;
                    }
                    catch (ObjectStoreException e) {
                        throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)"An error occurred while checking the existence for Item with ID [%s]", (Object[])new Object[]{itemId}), (Throwable)e);
                    }
                } else {
                    status = WatermarkStatus.REJECT;
                }
            }
lbl31:
            // 7 sources

            if (status == WatermarkStatus.REJECT && PollingSourceWrapper.LOGGER.isDebugEnabled()) {
                itemId = PollingSourceWrapper.this.getItemId(pollItem);
                PollingSourceWrapper.LOGGER.debug("Source in flow '{}' is skipping item '{}' because it was rejected by the watermark", (Object)PollingSourceWrapper.this.flowName, (Object)itemId);
            }
            return status;
        }
    }
}

