/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectivity.rest.commons.api.source;

import com.mulesoft.connectivity.rest.commons.api.config.RestConfiguration;
import com.mulesoft.connectivity.rest.commons.api.connection.RestConnection;
import com.mulesoft.connectivity.rest.commons.api.operation.HttpResponseResult;
import com.mulesoft.connectivity.rest.commons.api.source.RestPollingSourceStrategy;
import com.mulesoft.connectivity.rest.commons.internal.http.HttpResponseAttributes;
import com.mulesoft.connectivity.rest.commons.internal.util.MediaTypeUtils;
import com.mulesoft.connectivity.rest.commons.internal.util.RestUtils;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import javax.inject.Inject;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.el.ExpressionLanguage;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.core.api.util.StringUtils;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.connectivity.oauth.AccessTokenExpiredException;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.PollContext;
import org.mule.runtime.extension.api.runtime.source.PollingSource;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class RestPollingSource<C extends RestConnection, S extends Serializable, A>
extends PollingSource<InputStream, A> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RestPollingSource.class);
    @Connection
    protected ConnectionProvider<C> connectionProvider;
    @Inject
    private ExpressionLanguage expressionLanguage;
    protected C connection;
    @Config
    protected RestConfiguration config;
    protected RestPollingSourceStrategy<S, A> restPollingSourceStrategy;

    protected ExpressionLanguage getExpressionLanguage() {
        return this.expressionLanguage;
    }

    protected abstract RestPollingSourceStrategy<S, A> getRestPollingSourceStrategy() throws MuleException;

    protected MediaType getDefaultResponseMediaType() {
        return MediaType.APPLICATION_JSON;
    }

    protected void doStart() throws MuleException {
        this.connection = (RestConnection)this.connectionProvider.connect();
        this.restPollingSourceStrategy = this.getRestPollingSourceStrategy();
    }

    protected void doStop() {
        this.connectionProvider.disconnect(this.connection);
    }

    private HttpResponseResult send(HttpRequest httpRequest, PollContext<InputStream, A> pollContext, MediaType defaultMediaType) {
        String triggerIdentifier = ((Object)((Object)this)).getClass().getSimpleName();
        try {
            return this.connection.send(httpRequest, defaultMediaType);
        }
        catch (AccessTokenExpiredException e) {
            pollContext.onConnectionException(new ConnectionException((Throwable)e));
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)String.format("Exception found while executing poll: '%s'. Access token expiration. '%s'", triggerIdentifier, e.getMessage())), (Throwable)e);
        }
        catch (Exception e) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)String.format("Exception found while executing poll: '%s'. '%s'", triggerIdentifier, e.getMessage())), (Throwable)e);
        }
    }

    public void poll(PollContext<InputStream, A> pollContext) {
        Optional<S> lastWatermark = this.restPollingSourceStrategy.getLastWatermark().apply(pollContext);
        HttpResponseResult result = this.send(this.getRequest(lastWatermark), pollContext, MediaTypeUtils.resolveDefaultResponseMediaType(this.config, this.getDefaultResponseMediaType()));
        TypedValue<String> rawPage = RestUtils.consumeStringAndClose(result.getEntityContent(), result.getMediaType(), result.getCharset());
        if (!StringUtils.isBlank((String)((String)rawPage.getValue()))) {
            HttpResponseAttributes httpResponseAttributes = result.getHttpResponseAttributes();
            List<TypedValue<String>> items = this.restPollingSourceStrategy.extractItems(lastWatermark, rawPage, httpResponseAttributes.getStatusCode(), httpResponseAttributes.getReasonPhrase(), httpResponseAttributes.getHeaders());
            if (Objects.isNull(items)) {
                throw new IllegalArgumentException("Extracted items must not be null. An empty list must be returned instead of null.");
            }
            for (TypedValue<String> item : items) {
                pollContext.accept(this.getPollItemConsumer(lastWatermark, rawPage, item, this.restPollingSourceStrategy.getItemAttributes(httpResponseAttributes.getStatusCode(), httpResponseAttributes.getReasonPhrase(), httpResponseAttributes.getHeaders(), item)));
            }
        }
    }

    private Consumer<PollContext.PollItem<InputStream, A>> getPollItemConsumer(Optional<S> lastWatermark, TypedValue<String> fullResponse, TypedValue<String> item, A itemAttributes) {
        return pollItem -> {
            Result itemResult = Result.builder().output((Object)new ByteArrayInputStream(((String)item.getValue()).getBytes())).mediaType(item.getDataType().getMediaType()).attributes(itemAttributes).build();
            pollItem.setResult(itemResult);
            pollItem.setWatermark(this.restPollingSourceStrategy.getItemWatermark(lastWatermark, fullResponse, item));
            pollItem.setId(this.restPollingSourceStrategy.getItemIdentity(lastWatermark, fullResponse, item));
        };
    }

    public void onRejectedItem(Result<InputStream, A> result, SourceCallbackContext callbackContext) {
        if (result.getOutput() != null) {
            RestUtils.closeStream(result.getOutput());
        }
        LOGGER.debug("Item Rejected");
    }

    protected abstract HttpRequest getRequest(Optional<S> var1);
}

