/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.mq.restclient.internal.client;

import com.mulesoft.mq.restclient.api.AnypointMQMessage;
import com.mulesoft.mq.restclient.api.DestinationLocation;
import com.mulesoft.mq.restclient.api.DestinationLocationBuilder;
import com.mulesoft.mq.restclient.api.Lock;
import com.mulesoft.mq.restclient.api.MessageIdResult;
import com.mulesoft.mq.restclient.api.exception.MQClientConnectionException;
import com.mulesoft.mq.restclient.impl.OAuthCredentials;
import com.mulesoft.mq.restclient.internal.CourierRestClient;
import com.mulesoft.mq.restclient.internal.CourierUrlBuilder;
import com.mulesoft.mq.restclient.internal.ExceptionFactory;
import com.mulesoft.mq.restclient.internal.JsonUtils;
import com.mulesoft.mq.restclient.internal.MessageUtils;
import com.mulesoft.mq.restclient.internal.NewTtl;
import com.mulesoft.mq.restclient.internal.Request;
import com.mulesoft.mq.restclient.internal.RequestBuilder;
import com.mulesoft.mq.restclient.internal.Response;
import com.mulesoft.mq.restclient.internal.Utils;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.backoff.BackOffExecution;
import org.springframework.util.backoff.ExponentialBackOff;
import rx.Observable;
import rx.functions.Func1;
import rx.observables.BlockingObservable;
import rx.schedulers.Schedulers;

public abstract class AbstractCourierRestClient
implements CourierRestClient {
    public static final String BATCH_SIZE_QUERY_PARAM = "batchSize";
    public static final String POOLING_TIME_QUERY_PARAM = "pollingTime";
    public static final String LOCK_TTL_QUERY_PARAM = "lockTtl";
    public static long DEFAULT_TIMEOUT_MILLIS = 20000L;
    public static final int MAX_RETRIES = Integer.getInteger("max.retries.failure.operation", 5);
    protected static final Logger logger = LoggerFactory.getLogger(AbstractCourierRestClient.class);
    protected static final int EXTRA_RECEIVE_TIMEOUT = 2000;
    private static final String AUTHENTICATION_TOKEN_TYPE = "Bearer ";
    private static final long BACK_OFF_MULTIPLIER = Long.parseLong(System.getProperty("object.store.client.back.off.multiplier", "2"));
    private static final long BACK_OFF_INITIAL_WAIT = Long.parseLong(System.getProperty("object.store.client.back.off.initial.wait", "1000"));
    private static final long BACK_OFF_MAX_INTERVAL = Long.parseLong(System.getProperty("object.store.client.back.off.max.interval", "30000"));
    private static final List<AnypointMQMessage> EMPTY_MESSAGES_LIST = Collections.emptyList();
    private static final int MAX_GET_ACCESS_TOKEN_RETRIES = 5;
    private final OAuthCredentials oAuthCredentials;
    private final String userAgentInfo;
    private final CourierUrlBuilder courierUrlBuilder;
    private BlockingObservable<Response> observableAccessTokenResponse;
    private String accessToken;
    private String defaultOrganizationId;
    private String defaultEnvironmentId;
    private boolean alreadyDisposed = false;

    public AbstractCourierRestClient(String courierApiUrl, OAuthCredentials oAuthCredentials, String userAgentInfo) {
        this.courierUrlBuilder = new CourierUrlBuilder(courierApiUrl);
        this.oAuthCredentials = oAuthCredentials;
        this.userAgentInfo = userAgentInfo == null || userAgentInfo.trim().isEmpty() ? Utils.getUserAgent() : userAgentInfo;
    }

    @Override
    public void init() {
        this.ensureAccessToken();
    }

    @Override
    public void validate() {
        if (!((Response)this.process(this.createAuthRequest()).toBlocking().first()).isOk()) {
            throw new MQClientConnectionException("Connection is not valid");
        }
    }

    @Override
    public void dispose() {
        logger.debug("Client Disposed");
        this.alreadyDisposed = true;
    }

    @Override
    public DestinationLocation getDestinationLocation(String destinationName) {
        Utils.checkArgument(destinationName != null && !destinationName.trim().isEmpty(), "destinationName cannot be null nor empty");
        return new DestinationLocationBuilder().setOrganizationId(this.getDefaultOrganizationId()).setEnvironmentId(this.getDefaultEnvironmentId()).setName(destinationName).build();
    }

    public String getDefaultOrganizationId() {
        this.ensureAccessToken();
        return this.defaultOrganizationId;
    }

    public String getDefaultEnvironmentId() {
        this.ensureAccessToken();
        return this.defaultEnvironmentId;
    }

    protected void ensureAccessToken() {
        if (this.accessToken == null) {
            this.getAccessToken();
        }
    }

    @Override
    public Observable<List<AnypointMQMessage>> receive(DestinationLocation destinationLocation, int batchSize, long pollingTime, long lockTtl) {
        RequestBuilder requestBuilder = this.newRequestBuilderWithAgent().use(RequestBuilder.Method.GET).to(this.courierUrlBuilder.messages(destinationLocation));
        if (batchSize > 1) {
            requestBuilder.withQueryParam(BATCH_SIZE_QUERY_PARAM, Integer.toString(batchSize));
        }
        if (pollingTime > 0L) {
            requestBuilder.withQueryParam(POOLING_TIME_QUERY_PARAM, Long.toString(pollingTime));
            requestBuilder.waitingUpTo(pollingTime + 2000L, TimeUnit.MILLISECONDS);
        }
        if (lockTtl > 0L) {
            requestBuilder.withQueryParam(LOCK_TTL_QUERY_PARAM, Long.toString(lockTtl));
        }
        return this.withAccessToken(() -> this.processWithRetry(requestBuilder.withHeader("Authorization", AUTHENTICATION_TOKEN_TYPE + this.accessToken).build()).flatMap(response -> {
            try {
                if (response.getStatusCode() == 204) {
                    return Observable.just(EMPTY_MESSAGES_LIST);
                }
                if (response.isOk()) {
                    String body = response.getBody();
                    return Observable.just(JsonUtils.courierRestMessagesFromJson(body));
                }
                return Observable.error((Throwable)ExceptionFactory.create("RECEIVE MESSAGES", response, this.friendlyMessage((Response)response)));
            }
            catch (Exception e) {
                return Observable.error((Throwable)ExceptionFactory.create("RECEIVE MESSAGES", e));
            }
        }));
    }

    @Override
    public Observable<MessageIdResult> send(DestinationLocation destinationLocation, AnypointMQMessage message) {
        return this.withAccessToken(() -> this.oneResult("SEND ONE MESSAGE", this.processWithRetry(this.newRequestBuilderWithAgent().use(RequestBuilder.Method.PUT).to(this.courierUrlBuilder.message(destinationLocation, message.getMessageId())).withBody(JsonUtils.toJson(message)).withHeader("Content-Type", "application/json").withHeader("Authorization", AUTHENTICATION_TOKEN_TYPE + this.accessToken).build())));
    }

    @Override
    public Observable<List<MessageIdResult>> send(DestinationLocation destinationLocation, List<AnypointMQMessage> messages) {
        return this.withAccessToken(() -> this.manyResults("SEND MANY MESSAGES", this.processWithRetry(this.newRequestBuilderWithAgent().use(RequestBuilder.Method.PUT).to(this.courierUrlBuilder.messages(destinationLocation)).withBody(JsonUtils.toJson(messages)).withHeader("Content-Type", "application/json").withHeader("Authorization", AUTHENTICATION_TOKEN_TYPE + this.accessToken).build())));
    }

    @Override
    public Observable<MessageIdResult> ack(DestinationLocation destinationLocation, Lock lock) {
        return this.withAccessToken(() -> this.oneResult("ACK ONE MESSAGE", this.processWithRetry(this.newRequestBuilderWithAgent().use(RequestBuilder.Method.DELETE).to(this.courierUrlBuilder.message(destinationLocation, lock.getMessageId())).withBody(JsonUtils.toJson("lockId", lock.getLockId())).withHeader("Content-Type", "application/json").withHeader("Authorization", AUTHENTICATION_TOKEN_TYPE + this.accessToken).build())));
    }

    @Override
    public Observable<List<MessageIdResult>> ack(DestinationLocation destinationLocation, List<Lock> locks) {
        return this.withAccessToken(() -> this.manyResults("ACK MANY MESSAGES", this.processWithRetry(this.newRequestBuilderWithAgent().use(RequestBuilder.Method.DELETE).to(this.courierUrlBuilder.messages(destinationLocation)).withBody(JsonUtils.toJson(locks)).withHeader("Content-Type", "application/json").withHeader("Authorization", AUTHENTICATION_TOKEN_TYPE + this.accessToken).build())));
    }

    @Override
    public Observable<MessageIdResult> nack(DestinationLocation destinationLocation, Lock lock) {
        return this.withAccessToken(() -> this.oneResult("NOT-ACK ONE MESSAGE", this.processWithRetry(this.newRequestBuilderWithAgent().use(RequestBuilder.Method.DELETE).to(this.courierUrlBuilder.lock(destinationLocation, lock.getMessageId(), lock.getLockId())).withHeader("Authorization", AUTHENTICATION_TOKEN_TYPE + this.accessToken).build())));
    }

    @Override
    public Observable<List<MessageIdResult>> nack(DestinationLocation destinationLocation, List<Lock> locks) {
        return this.withAccessToken(() -> this.manyResults("NOT-ACK MANY MESSAGES", this.processWithRetry(this.newRequestBuilderWithAgent().use(RequestBuilder.Method.DELETE).to(this.courierUrlBuilder.locks(destinationLocation)).withBody(JsonUtils.toJson(locks)).withHeader("Content-Type", "application/json").withHeader("Authorization", AUTHENTICATION_TOKEN_TYPE + this.accessToken).build())));
    }

    @Override
    public Observable<MessageIdResult> modifyTtl(DestinationLocation destinationLocation, NewTtl newTtl) {
        return this.withAccessToken(() -> this.oneResult("MODIFY TTL ONE MESSAGE", this.processWithRetry(this.newRequestBuilderWithAgent().use(RequestBuilder.Method.PATCH).to(this.courierUrlBuilder.lock(destinationLocation, newTtl.getMessageId(), newTtl.getLockId())).withBody(JsonUtils.toJson("ttl", Long.toString(newTtl.getTtl()))).withHeader("Content-Type", "application/json").withHeader("Authorization", AUTHENTICATION_TOKEN_TYPE + this.accessToken).build())));
    }

    @Override
    public Observable<List<MessageIdResult>> modifyTtl(DestinationLocation destinationLocation, List<NewTtl> newTtls) {
        return this.withAccessToken(() -> this.manyResults("MODIFY TTL MANY MESSAGES", this.processWithRetry(this.newRequestBuilderWithAgent().use(RequestBuilder.Method.PATCH).to(this.courierUrlBuilder.locks(destinationLocation)).withBody(JsonUtils.toJson(newTtls)).withHeader("Content-Type", "application/json").withHeader("Authorization", AUTHENTICATION_TOKEN_TYPE + this.accessToken).build())));
    }

    private <T> Observable<T> withAccessToken(Supplier<Observable<T>> requestExecutor) {
        try {
            this.getAccessToken();
        }
        catch (Exception e) {
            return Observable.error((Throwable)e);
        }
        return requestExecutor.get();
    }

    private synchronized void resetAccessToken() {
        if (this.accessToken == null && this.observableAccessTokenResponse != null) {
            return;
        }
        this.accessToken = null;
        this.observableAccessTokenResponse = this.process(this.createAuthRequest()).toBlocking();
    }

    private Request createAuthRequest() {
        return this.newRequestBuilderWithAgent().use(RequestBuilder.Method.POST).to(this.courierUrlBuilder.authorizeUrl()).withHeader("Content-Type", "application/x-www-form-urlencoded").withBody("client_id=" + this.oAuthCredentials.getClientId() + "&client_secret=" + this.oAuthCredentials.getClientSecret() + "&grant_type=client_credentials").build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String getAccessToken() {
        if (this.accessToken != null) {
            return this.accessToken;
        }
        Response accessTokenResponse = null;
        Exception lastException = null;
        AbstractCourierRestClient abstractCourierRestClient = this;
        synchronized (abstractCourierRestClient) {
            if (this.accessToken == null) {
                if (this.observableAccessTokenResponse == null) {
                    this.resetAccessToken();
                }
                for (int accessTokenRetries = 0; this.accessToken == null && accessTokenRetries < 5; ++accessTokenRetries) {
                    accessTokenResponse = null;
                    try {
                        accessTokenResponse = (Response)this.observableAccessTokenResponse.first();
                    }
                    catch (Exception e) {
                        lastException = e;
                    }
                    if (accessTokenResponse == null || !accessTokenResponse.isOk()) continue;
                    this.parseAccessTokenResponse(accessTokenResponse);
                }
            }
        }
        if (this.accessToken != null) {
            return this.accessToken;
        }
        StringBuilder sb = new StringBuilder("Can not login into broker");
        String response = this.friendlyMessage(accessTokenResponse);
        if (!response.trim().isEmpty()) {
            sb.append(" - ").append(response);
        }
        if (lastException != null) {
            sb.append(" - ").append(lastException.getMessage());
        }
        logger.debug(sb.toString());
        throw ExceptionFactory.create("AUTHORIZE AGAINST BROKER", accessTokenResponse, sb.toString(), lastException);
    }

    private void parseAccessTokenResponse(Response accessTokenResponse) {
        String body = accessTokenResponse.getBody();
        this.accessToken = JsonUtils.extractValueFromJson(body, "access_token");
        Map<String, String> simpleClient = JsonUtils.extractMapValueFromJson(body, "simple_client");
        if (simpleClient != null) {
            this.defaultOrganizationId = simpleClient.get("orgId");
            this.defaultEnvironmentId = simpleClient.get("envId");
        }
    }

    private String friendlyMessage(Response response) {
        if (response != null) {
            if (response.getStatusCode() == 401) {
                return "NOT AUTHORISED";
            }
            return String.format("%d - %s (%s)", response.getStatusCode(), response.getStatusText(), response.getBody());
        }
        return "";
    }

    private Observable<Response> processWithRetry(Request request) {
        ExponentialBackOff exponentialBackOff = new ExponentialBackOff();
        exponentialBackOff.setMultiplier((double)BACK_OFF_MULTIPLIER);
        exponentialBackOff.setInitialInterval(BACK_OFF_INITIAL_WAIT);
        exponentialBackOff.setMaxInterval(BACK_OFF_MAX_INTERVAL);
        BackOffExecution backOffExecution = exponentialBackOff.start();
        return this.internalProcessWithRetry(request, null, null, MAX_RETRIES + 1, backOffExecution);
    }

    private Observable<Response> internalProcessWithRetry(final Request request, Response res, Throwable throwable, final int remaining, final BackOffExecution exponentialBackOff) {
        if (this.alreadyDisposed) {
            return Observable.error((Throwable)new MQClientConnectionException("Client has already been disposed"));
        }
        if (remaining <= 0) {
            return this.getErrorResponse(res, throwable);
        }
        return this.internalProcess(request).flatMap((Func1)new Func1<Response, Observable<? extends Response>>(){

            public Observable<? extends Response> call(Response response) {
                if (response.getStatusCode() >= 500 && response.getStatusCode() <= 599) {
                    return AbstractCourierRestClient.this.getRetryObservable(request, response, exponentialBackOff, null, remaining - 1);
                }
                return Observable.just((Object)response);
            }
        }).onErrorResumeNext((Func1)new Func1<Throwable, Observable<? extends Response>>(){

            public Observable<? extends Response> call(Throwable th) {
                return AbstractCourierRestClient.this.getRetryObservable(request, null, exponentialBackOff, th, remaining - 1);
            }
        });
    }

    private Observable<Response> getErrorResponse(Response res, Throwable throwable) {
        if (throwable != null) {
            logger.error("There was an error processing operation after {} retries", (Object)MAX_RETRIES, (Object)throwable);
            return Observable.error((Throwable)throwable);
        }
        return Observable.just((Object)res);
    }

    private Observable<? extends Response> getRetryObservable(Request request, Response response, BackOffExecution exponentialBackOff, Throwable throwable, int remaining) {
        if (this.alreadyDisposed) {
            return Observable.error((Throwable)new MQClientConnectionException("Client has already been disposed"));
        }
        if (remaining == 0 || throwable instanceof RejectedExecutionException) {
            return this.getErrorResponse(response, throwable);
        }
        return Observable.defer(() -> {
            long nextBackOff = this.getExpoBackOffWithJitter(exponentialBackOff);
            logger.debug("There was an error processing operation retrying in {}ms, remaining attempts {}", new Object[]{nextBackOff, remaining, throwable});
            return Observable.timer((long)nextBackOff, (TimeUnit)TimeUnit.MILLISECONDS).flatMap(time -> this.internalProcessWithRetry(request, response, throwable, remaining, exponentialBackOff));
        });
    }

    private long getExpoBackOffWithJitter(BackOffExecution exponentialBackOff) {
        if (exponentialBackOff == null) {
            return BACK_OFF_MAX_INTERVAL;
        }
        Random rand = new Random();
        long next = exponentialBackOff.nextBackOff();
        if ((next = (long)((float)next * (1.0f + rand.nextFloat() * (float)(BACK_OFF_MULTIPLIER - 1L)))) > BACK_OFF_MAX_INTERVAL) {
            next = BACK_OFF_MAX_INTERVAL;
        }
        return next;
    }

    private Observable<Response> internalProcess(final Request request) {
        return this.process(request).flatMap((Func1)new Func1<Response, Observable<Response>>(){

            public Observable<Response> call(Response response) {
                if (response.isUnauthorized()) {
                    return AbstractCourierRestClient.this.regenerateTokenAndRetry(request);
                }
                return Observable.just((Object)response);
            }
        }).onErrorResumeNext((Func1)new Func1<Throwable, Observable<Response>>(){

            public Observable<Response> call(Throwable throwable) {
                if (AbstractCourierRestClient.this.isUnauthorized(throwable)) {
                    return AbstractCourierRestClient.this.regenerateTokenAndRetry(request);
                }
                logger.debug("An error occurred while processing request {}", (Object)request.toString(), (Object)throwable);
                return Observable.error((Throwable)throwable);
            }
        });
    }

    private Observable<Response> regenerateTokenAndRetry(final Request request) {
        return Observable.just(null).observeOn(Schedulers.newThread()).flatMap((Func1)new Func1<Object, Observable<Response>>(){

            public Observable<Response> call(Object empty) {
                AbstractCourierRestClient.this.resetAccessToken();
                Request retryRequest = AbstractCourierRestClient.this.newRequestBuilderWithAgent().wrap(request).withHeader("Authorization", AbstractCourierRestClient.AUTHENTICATION_TOKEN_TYPE + AbstractCourierRestClient.this.getAccessToken()).build();
                return AbstractCourierRestClient.this.process(retryRequest);
            }
        });
    }

    private boolean isUnauthorized(Throwable throwable) {
        return Utils.ofType(throwable, IllegalStateException.class) && Utils.withMessage(throwable, "401 response received, but no WWW-Authenticate header was present");
    }

    private Observable<MessageIdResult> oneResult(final String operation, Observable<Response> responseObservable) {
        return responseObservable.flatMap((Func1)new Func1<Response, Observable<MessageIdResult>>(){

            public Observable<MessageIdResult> call(Response response) {
                try {
                    if (response.isOk()) {
                        return Observable.just((Object)JsonUtils.messageIdResultFromJson(response.getBody()));
                    }
                    return Observable.error((Throwable)ExceptionFactory.create(operation, response, AbstractCourierRestClient.this.friendlyMessage(response)));
                }
                catch (Exception e) {
                    return Observable.error((Throwable)ExceptionFactory.create(operation, e));
                }
            }
        });
    }

    private Observable<List<MessageIdResult>> manyResults(final String operation, Observable<Response> responseObservable) {
        return responseObservable.flatMap((Func1)new Func1<Response, Observable<List<MessageIdResult>>>(){

            public Observable<List<MessageIdResult>> call(Response response) {
                try {
                    if (response.isOk()) {
                        return Observable.just(JsonUtils.messageIdResultsFromJson(response.getBody()));
                    }
                    return Observable.error((Throwable)ExceptionFactory.create(operation, response, AbstractCourierRestClient.this.friendlyMessage(response)));
                }
                catch (Exception e) {
                    return Observable.error((Throwable)ExceptionFactory.create(operation, e));
                }
            }
        });
    }

    protected void logProcessStart(Request request) {
        logger.debug("Sending {} request to {}", (Object)request.getMethod(), (Object)request.getUrl());
    }

    protected void logProcessSuccess(Request request, Response response) {
        logger.debug("Received response from {} {}. Request Id: {}. Response code: {}", new Object[]{request.getMethod(), request.getUrl(), this.getRequestId(response), response.getStatusCode()});
    }

    protected void logProcessError(Request request, Throwable e) {
        if (Utils.isTimeout(e)) {
            logger.debug("Request timed out from {} {}.", (Object)request.getMethod(), (Object)request.getUrl());
        } else if (Utils.isConnectionInterrupted(e)) {
            logger.error(String.format("Connection unexpectedly closed from %s %s.", new Object[]{request.getMethod(), request.getUrl()}));
            logger.debug(String.format("Connection unexpectedly closed from %s %s.", new Object[]{request.getMethod(), request.getUrl()}), e);
        } else if (Utils.isConnectionRefused(e)) {
            logger.error(String.format("Can not connect to broker for %s %s.", new Object[]{request.getMethod(), request.getUrl()}));
            logger.debug(String.format("Can not connect to broker for %s %s.", new Object[]{request.getMethod(), request.getUrl()}), e);
        } else {
            logger.error(String.format("Error from %s %s. %s", new Object[]{request.getMethod(), request.getUrl(), MessageUtils.getCompleteMessage(e)}));
            logger.debug(String.format("Error from %s %s.", new Object[]{request.getMethod(), request.getUrl()}), e);
        }
    }

    private String getRequestId(Response response) {
        try {
            String requestId = response.getHeader("X-Request-Id");
            if (requestId == null || requestId.isEmpty()) {
                requestId = "No-Id";
            }
            return requestId;
        }
        catch (Exception e) {
            return "No-Id";
        }
    }

    private RequestBuilder newRequestBuilderWithAgent() {
        return this.newRequestBuilder().withHeader("User-Agent", this.userAgentInfo);
    }

    protected abstract RequestBuilder newRequestBuilder();

    protected abstract Observable<Response> process(Request var1);
}

