/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.publisher.source;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.pubsub.v1.PushConfig;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.etl.api.streaming.StreamingContext;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.publisher.source.BackoffConfig;
import io.cdap.plugin.gcp.publisher.source.GoogleSubscriberConfig;
import io.cdap.plugin.gcp.publisher.source.PubSubDirectDStream;
import io.cdap.plugin.gcp.publisher.source.PubSubInputDStream;
import io.cdap.plugin.gcp.publisher.source.PubSubMessage;
import io.cdap.plugin.gcp.publisher.source.PubSubStructuredRecordConverter;
import io.cdap.plugin.gcp.publisher.source.PubSubSubscriberConfig;
import io.cdap.plugin.gcp.publisher.source.SerializableFunction;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.dstream.DStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;
import scala.collection.Iterable;
import scala.collection.JavaConverters;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

public final class PubSubSubscriberUtil {
    protected static final Logger LOG = LoggerFactory.getLogger(PubSubSubscriberUtil.class);
    private static final int RESOURCE_EXHAUSTED = StatusCode.Code.RESOURCE_EXHAUSTED.getHttpStatusCode();
    private static final int CANCELLED = StatusCode.Code.CANCELLED.getHttpStatusCode();
    private static final int INTERNAL = StatusCode.Code.INTERNAL.getHttpStatusCode();
    private static final int UNAVAILABLE = StatusCode.Code.UNAVAILABLE.getHttpStatusCode();
    private static final int DEADLINE_EXCEEDED = StatusCode.Code.DEADLINE_EXCEEDED.getHttpStatusCode();
    private static final Set<Integer> RETRYABLE_STATUS_CODES = Stream.of(RESOURCE_EXHAUSTED, CANCELLED, INTERNAL, UNAVAILABLE, DEADLINE_EXCEEDED).collect(Collectors.toSet());
    private static final int MAX_ATTEMPTS = 5;

    private PubSubSubscriberUtil() {
    }

    public static <T> JavaDStream<T> getStream(StreamingContext streamingContext, PubSubSubscriberConfig config, SerializableFunction<PubSubMessage, T> mappingFunction) throws Exception {
        boolean autoAcknowledge = true;
        if (streamingContext.isPreviewEnabled()) {
            autoAcknowledge = false;
        }
        return PubSubSubscriberUtil.getInputDStream(streamingContext, config, autoAcknowledge, mappingFunction);
    }

    protected static <T> JavaDStream<T> getInputDStream(StreamingContext streamingContext, PubSubSubscriberConfig config, boolean autoAcknowledge, SerializableFunction<PubSubMessage, T> mappingFn) {
        if (streamingContext.isStateStoreEnabled()) {
            ClassTag tag = ClassTag$.MODULE$.apply(PubSubMessage.class);
            PubSubDirectDStream<T> pubSubDirectDStream = new PubSubDirectDStream<T>(streamingContext, config, streamingContext.getBatchInterval(), autoAcknowledge, mappingFn);
            return new JavaDStream(pubSubDirectDStream, tag);
        }
        ArrayList<PubSubInputDStream> receivers = new ArrayList<PubSubInputDStream>(config.getNumberOfReaders());
        ClassTag tag = ClassTag$.MODULE$.apply(PubSubMessage.class);
        for (int i = 1; i <= config.getNumberOfReaders(); ++i) {
            PubSubInputDStream receiverInputDStream = new PubSubInputDStream(streamingContext.getSparkStreamingContext().ssc(), config, StorageLevel.MEMORY_ONLY(), autoAcknowledge);
            receivers.add(receiverInputDStream);
        }
        DStream dStream = streamingContext.getSparkStreamingContext().ssc().union(((Iterable)JavaConverters.collectionAsScalaIterableConverter(receivers).asScala()).toSeq(), tag);
        return new JavaDStream(dStream, tag).map((Function & Serializable)message -> mappingFn.apply((PubSubMessage)message));
    }

    public static void createSubscription(BooleanSupplier preCheck, BackoffConfig backoffConfig, String subscription, String topic, Supplier<SubscriptionAdminClient> clientSupplier, Predicate<ApiException> isRetryableException) throws InterruptedException, IOException {
        int backoff = backoffConfig.getInitialBackoffMs();
        int attempts = 5;
        ApiException lastApiException = null;
        while (preCheck.getAsBoolean() && attempts-- > 0) {
            try {
                SubscriptionAdminClient subscriptionAdminClient = clientSupplier.get();
                int ackDeadline = 60;
                subscriptionAdminClient.createSubscription(subscription, topic, PushConfig.getDefaultInstance(), ackDeadline);
                return;
            }
            catch (ApiException ae) {
                lastApiException = ae;
                if (ae.getStatusCode().getCode().equals((Object)StatusCode.Code.ALREADY_EXISTS)) {
                    return;
                }
                if (isRetryableException.test(ae)) {
                    backoff = PubSubSubscriberUtil.sleepAndIncreaseBackoff(preCheck, backoff, backoffConfig);
                    continue;
                }
                throw ae;
            }
        }
        throw new RuntimeException(lastApiException);
    }

    public static boolean isApiExceptionRetryable(ApiException ae) {
        return ae.isRetryable() || RETRYABLE_STATUS_CODES.contains(ae.getStatusCode().getCode().getHttpStatusCode());
    }

    public static SerializableFunction<PubSubMessage, StructuredRecord> getMappingFunction(GoogleSubscriberConfig config) {
        return new PubSubStructuredRecordConverter(config);
    }

    public static RetrySettings getRetrySettings() {
        BackoffConfig backoffConfig = BackoffConfig.defaultInstance();
        return RetrySettings.newBuilder().setInitialRetryDelay(Duration.ofMillis((long)backoffConfig.getInitialBackoffMs())).setMaxRetryDelay(Duration.ofMillis((long)backoffConfig.getMaximumBackoffMs())).setRetryDelayMultiplier(backoffConfig.getBackoffFactor()).setMaxAttempts(5).build();
    }

    @Nullable
    public static Credentials createCredentials(String serviceAccount, boolean serviceAccountFilePath) {
        try {
            return serviceAccount == null ? null : GCPUtils.loadServiceAccountCredentials(serviceAccount, serviceAccountFilePath);
        }
        catch (IOException e) {
            throw new RuntimeException("Error creating credentials from service account.", e);
        }
    }

    public static <T> T callWithRetry(Supplier<T> supplier, BackoffConfig backoffConfig, int maxAttempts) throws Exception {
        int backoff = backoffConfig.getInitialBackoffMs();
        ApiException lastApiException = null;
        while (maxAttempts-- > 0) {
            try {
                return supplier.get();
            }
            catch (ApiException ae) {
                lastApiException = ae;
                if (PubSubSubscriberUtil.isApiExceptionRetryable(ae)) {
                    backoff = PubSubSubscriberUtil.sleepAndIncreaseBackoff(() -> true, backoff, backoffConfig);
                    continue;
                }
                throw ae;
            }
        }
        throw new RuntimeException(lastApiException);
    }

    private static SubscriptionAdminClient buildSubscriptionAdminClient(Credentials credentials) throws IOException {
        SubscriptionAdminSettings.Builder builder = SubscriptionAdminSettings.newBuilder();
        if (credentials != null) {
            builder.setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)credentials));
        }
        return SubscriptionAdminClient.create((SubscriptionAdminSettings)builder.build());
    }

    private static int sleepAndIncreaseBackoff(BooleanSupplier preCheck, int backoff, BackoffConfig backoffConfig) throws InterruptedException {
        if (preCheck.getAsBoolean()) {
            LOG.trace("Backoff - Sleeping for {} ms.", (Object)backoff);
            Thread.sleep(backoff);
        }
        return PubSubSubscriberUtil.calculateUpdatedBackoff(backoff, backoffConfig);
    }

    private static int calculateUpdatedBackoff(int backoff, BackoffConfig backoffConfig) {
        return Math.min((int)((double)backoff * backoffConfig.getBackoffFactor()), backoffConfig.getMaximumBackoffMs());
    }
}

