/*
 * Decompiled with CFR 0.152.
 */
package akka.stream.alpakka.sqs.scaladsl;

import akka.NotUsed;
import akka.stream.FlowShape;
import akka.stream.Graph;
import akka.stream.OverflowStrategy$;
import akka.stream.alpakka.sqs.SqsSourceSettings;
import akka.stream.alpakka.sqs.SqsSourceSettings$;
import akka.stream.alpakka.sqs.impl.BalancingMapAsync;
import akka.stream.alpakka.sqs.scaladsl.SqsAckFlow$;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import java.io.Serializable;
import java.util.Collection;
import java.util.concurrent.CompletionStage;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.Seq$;
import scala.compat.java8.FutureConverters;
import scala.compat.java8.FutureConverters$;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxesRunTime;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

public final class SqsSource$ {
    public static SqsSource$ MODULE$;

    static {
        new SqsSource$();
    }

    public Source<Message, NotUsed> apply(String queueUrl, SqsSourceSettings settings, SqsAsyncClient sqsClient) {
        ReceiveMessageRequest receiveMessageRequest;
        SqsAckFlow$.MODULE$.checkClient(sqsClient);
        ReceiveMessageRequest.Builder requestBuilder = ReceiveMessageRequest.builder().queueUrl(queueUrl).attributeNamesWithStrings((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)settings.attributeNames().map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.name(), Seq$.MODULE$.canBuildFrom())).asJava()).messageAttributeNames((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)settings.messageAttributeNames().map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.name(), Seq$.MODULE$.canBuildFrom())).asJava()).maxNumberOfMessages(Predef$.MODULE$.int2Integer(settings.maxBatchSize())).waitTimeSeconds(Predef$.MODULE$.int2Integer(settings.waitTimeSeconds()));
        Option<FiniteDuration> option = settings.visibilityTimeout();
        if (None$.MODULE$.equals(option)) {
            receiveMessageRequest = (ReceiveMessageRequest)requestBuilder.build();
        } else if (option instanceof Some) {
            Some some = (Some)option;
            FiniteDuration t = (FiniteDuration)some.value();
            receiveMessageRequest = (ReceiveMessageRequest)requestBuilder.visibilityTimeout(Predef$.MODULE$.int2Integer((int)t.toSeconds())).build();
        } else {
            throw new MatchError(option);
        }
        return (Source)Source$.MODULE$.repeat((Object)receiveMessageRequest).via(this.resolveHandler(settings.parallelRequests(), sqsClient)).map((Function1 & Serializable & scala.Serializable)x$3 -> ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(x$3.messages()).asScala()).toList()).takeWhile((Function1 & Serializable & scala.Serializable)messages -> BoxesRunTime.boxToBoolean((boolean)SqsSource$.$anonfun$apply$4(settings, messages))).mapConcat((Function1 & Serializable & scala.Serializable)x -> (List)Predef$.MODULE$.identity(x)).buffer(settings.maxBufferSize(), OverflowStrategy$.MODULE$.backpressure());
    }

    public SqsSourceSettings apply$default$2() {
        return SqsSourceSettings$.MODULE$.Defaults();
    }

    private Graph<FlowShape<ReceiveMessageRequest, ReceiveMessageResponse>, NotUsed> resolveHandler(int parallelism, SqsAsyncClient sqsClient) {
        if (parallelism == 1) {
            return (Graph)Flow$.MODULE$.apply().mapAsyncUnordered(parallelism, (Function1 & Serializable & scala.Serializable)x$4 -> FutureConverters.CompletionStageOps$.MODULE$.toScala$extension(FutureConverters$.MODULE$.CompletionStageOps((CompletionStage)sqsClient.receiveMessage(x$4))));
        }
        return new BalancingMapAsync(parallelism, (Function1 & Serializable & scala.Serializable)x$5 -> FutureConverters.CompletionStageOps$.MODULE$.toScala$extension(FutureConverters$.MODULE$.CompletionStageOps((CompletionStage)sqsClient.receiveMessage(x$5))), (Function2 & Serializable & scala.Serializable)(response, x$6) -> BoxesRunTime.boxToInteger((int)SqsSource$.$anonfun$resolveHandler$3(parallelism, response, BoxesRunTime.unboxToInt((Object)x$6))));
    }

    public static final /* synthetic */ boolean $anonfun$apply$4(SqsSourceSettings settings$1, List messages) {
        return !settings$1.closeOnEmptyReceive() || messages.nonEmpty();
    }

    public static final /* synthetic */ int $anonfun$resolveHandler$3(int parallelism$1, ReceiveMessageResponse response, int x$6) {
        if (response.messages().isEmpty()) {
            return 1;
        }
        return parallelism$1;
    }

    private SqsSource$() {
        MODULE$ = this;
    }
}

