/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.extension.ic.internal.runtime.source;

import com.mulesoft.connectivity.mule.api.operation.OperationResult;
import com.mulesoft.connectivity.mule.api.operation.ResultError;
import com.mulesoft.connectivity.mule.api.trigger.NextData;
import com.mulesoft.connectivity.mule.api.trigger.TriggerItem;
import com.mulesoft.connectivity.mule.api.trigger.TriggerPage;
import com.mulesoft.connectivity.mule.persistence.model.MuleSourceSerializableModel;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessage;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.extension.api.connectivity.oauth.AccessTokenExpiredException;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.PollContext;
import org.mule.runtime.extension.api.runtime.source.PollingSource;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.runtime.extension.api.runtime.source.SourceFactoryContext;
import org.mule.runtime.extension.ic.internal.runtime.connection.Connection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectivityPollingSource
extends PollingSource<Object, Object> {
    protected static final Logger LOGGER = LoggerFactory.getLogger(ConnectivityPollingSource.class);
    private final MuleSourceSerializableModel model;
    private final ConnectionProvider<?> connectionProvider;
    private final Map<String, Object> parameters;
    private Connection connection;

    public ConnectivityPollingSource(SourceFactoryContext ctx, MuleSourceSerializableModel model) {
        this.model = model;
        this.connectionProvider = (ConnectionProvider)ctx.getConnectionProvider().orElseThrow(() -> new IllegalArgumentException("UC Sources require connection providers"));
        this.parameters = new HashMap<String, Object>();
        ctx.getParameterization().forEachParameter((group, param, value) -> this.parameters.put(param.getName(), value));
    }

    protected void doStart() throws MuleException {
        LOGGER.debug("Starting PollingSource for '{}'", (Object)this.getModelReference());
        this.connection = (Connection)this.connectionProvider.connect();
    }

    protected void doStop() {
        LOGGER.debug("Stopping PollingSource '{}'", (Object)this.getModelReference());
    }

    public void poll(PollContext<Object, Object> context) {
        if (context.isSourceStopping()) {
            return;
        }
        context.setWatermarkComparator((a, b) -> this.connection.compareWatermarks(this.model, (Serializable)a, (Serializable)b));
        Serializable currentWatermark = context.getWatermark().map(this.connection::serializeWatermark).orElseGet(() -> {
            LOGGER.debug("First poll for '{}'", (Object)this.getModelReference());
            return this.connection.getInitialWatermark(this.model, this.parameters);
        });
        LOGGER.debug("Starting poll of '{}' with watermark '{}'", (Object)this.getModelReference(), this.connection.deserializeWatermark(currentWatermark));
        OperationResult<TriggerPage> result = this.connection.executeTrigger(this.model, currentWatermark, this.parameters);
        Optional<NextData> nextData = this.processResultAndGetNextData(result, context);
        while (!context.isSourceStopping() && nextData.isPresent() && nextData.get().getNextPage().isPresent()) {
            result = this.connection.executeTriggerNextPage(this.model, nextData.get());
            nextData = this.processResultAndGetNextData(result, context);
        }
    }

    public void onRejectedItem(Result<Object, Object> result, SourceCallbackContext sourceCallbackContext) {
        LOGGER.debug("Item Rejected: '{}'", result.getOutput());
    }

    private Optional<NextData> processResultAndGetNextData(OperationResult<TriggerPage> result, PollContext<Object, Object> context) {
        try {
            TriggerPage page = this.extractSuccessfulResult(result);
            this.acceptAllItems(page, context);
            return Optional.of(page.getNextData());
        }
        catch (AccessTokenExpiredException e) {
            context.onConnectionException(new ConnectionException((Throwable)e));
            return Optional.empty();
        }
    }

    private void acceptAllItems(TriggerPage page, PollContext<Object, Object> context) {
        for (TriggerItem pageItem : page.getItems()) {
            PollContext.PollItemStatus status = context.accept(pollItem -> {
                Result result = Result.builder().output(pageItem.getValue()).attributes(null).build();
                pollItem.setResult(result).setId(pageItem.getIdentity()).setWatermark((Serializable)pageItem.getWatermark());
            });
            if (status != PollContext.PollItemStatus.SOURCE_STOPPING) continue;
            return;
        }
    }

    private <T> T extractSuccessfulResult(OperationResult<T> result) {
        if (result.isSuccess()) {
            return (T)result.getValue();
        }
        if (this.connection.isTokenExpired(result)) {
            throw new AccessTokenExpiredException();
        }
        ResultError error = result.getErrorValue();
        Object errorValue = error.getValue();
        String errorDescription = error.getDescription().orElse("Unknown");
        I18nMessage message = I18nMessageFactory.createStaticMessage((String)"Polling operation failed: %s\n Cause: %s", (Object[])new Object[]{errorValue, errorDescription});
        throw new MuleRuntimeException(message);
    }

    private String getModelReference() {
        return this.model.getModelReference().orElse("no-model-reference");
    }
}

