/*
 * Decompiled with CFR 0.152.
 */
package com.github.davidmoten.rx2.aws;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.github.davidmoten.guavamini.Preconditions;
import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
import com.github.davidmoten.rx2.aws.SqsMessage;
import com.github.davidmoten.rx2.aws.SqsQueue;
import com.github.davidmoten.rx2.aws.Util;
import io.reactivex.Emitter;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.exceptions.CompositeException;
import io.reactivex.functions.BiConsumer;
import io.reactivex.schedulers.Schedulers;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;

public final class Sqs {
    private static final String HTTPS = "https://";

    private Sqs() {
    }

    public static SqsBuilder queueName(String queueName) {
        return new SqsBuilder(SqsQueue.fromQueueName(queueName));
    }

    public static BuilderWithOwnerAccountId ownerAccountId(String ownerAccountId) {
        return new BuilderWithOwnerAccountId(ownerAccountId);
    }

    public static SqsBuilder queueUrl(String queueUrl) {
        Preconditions.checkArgument((boolean)queueUrl.startsWith(HTTPS), (String)("queueUrl must be an https url: " + queueUrl));
        return new SqsBuilder(SqsQueue.fromQueueUrl(queueUrl));
    }

    public static String sendToQueueUsingS3(AmazonSQS sqs, String queueUrl, AmazonS3 s3, String bucketName, Map<String, String> headers, byte[] message, Callable<String> s3IdFactory) {
        Preconditions.checkNotNull((Object)sqs);
        Preconditions.checkNotNull((Object)s3);
        Preconditions.checkNotNull((Object)queueUrl);
        Preconditions.checkNotNull((Object)bucketName);
        Preconditions.checkNotNull((Object)message);
        String s3Id = Util.uncheckedCall(s3IdFactory);
        ObjectMetadata metadata = new ObjectMetadata();
        metadata.setContentLength((long)message.length);
        headers.entrySet().forEach(h -> metadata.setHeader((String)h.getKey(), h.getValue()));
        s3.putObject(bucketName, s3Id, (InputStream)new ByteArrayInputStream(message), metadata);
        try {
            sqs.sendMessage(queueUrl, s3Id);
        }
        catch (RuntimeException e) {
            try {
                s3.deleteObject(bucketName, s3Id);
                throw e;
            }
            catch (RuntimeException e2) {
                throw new CompositeException(new Throwable[]{e, e2});
            }
        }
        return s3Id;
    }

    public static String sendToQueueUsingS3(AmazonSQS sqs, String queueUrl, AmazonS3 s3, String bucketName, byte[] message, Callable<String> s3IdFactory) {
        return Sqs.sendToQueueUsingS3(sqs, queueUrl, s3, bucketName, Collections.emptyMap(), message, s3IdFactory);
    }

    public static String sendToQueueUsingS3(AmazonSQS sqs, String queueUrl, AmazonS3 s3, String bucketName, byte[] message) {
        return Sqs.sendToQueueUsingS3(sqs, queueUrl, s3, bucketName, message, () -> UUID.randomUUID().toString().replace("-", ""));
    }

    public static String sendToQueueUsingS3(AmazonSQS sqs, String queueUrl, AmazonS3 s3, String bucketName, Map<String, String> headers, byte[] message) {
        return Sqs.sendToQueueUsingS3(sqs, queueUrl, s3, bucketName, headers, message, () -> UUID.randomUUID().toString().replace("-", ""));
    }

    static Flowable<SqsMessage> messages(Callable<AmazonSQS> sqsFactory, Optional<Callable<AmazonS3>> s3Factory, SqsQueue queue, Optional<String> bucketName, Optional<Flowable<Integer>> waitTimesSeconds, Consumer<? super String> logger, Runnable prePoll, Consumer<? super Optional<Throwable>> postPoll) {
        Preconditions.checkNotNull(sqsFactory);
        Preconditions.checkNotNull(s3Factory);
        Preconditions.checkNotNull((Object)queue);
        Preconditions.checkNotNull(bucketName);
        Preconditions.checkNotNull(waitTimesSeconds);
        Preconditions.checkNotNull(logger);
        Preconditions.checkNotNull((Object)prePoll);
        Preconditions.checkNotNull(postPoll);
        return Flowable.using(sqsFactory, sqs -> Sqs.createFlowableWithSqs(sqs, s3Factory, sqsFactory, queue, bucketName, waitTimesSeconds, logger, prePoll, postPoll), sqs -> sqs.shutdown());
    }

    private static Flowable<SqsMessage> createFlowableWithSqs(AmazonSQS sqs, Optional<Callable<AmazonS3>> s3Factory, Callable<AmazonSQS> sqsFactory, SqsQueue queue, Optional<String> bucketName, Optional<Flowable<Integer>> waitTimesSeconds, Consumer<? super String> logger, Runnable prePoll, Consumer<? super Optional<Throwable>> postPoll) {
        return Flowable.using(() -> s3Factory.map(x -> (AmazonS3)Util.uncheckedCall(x)), s3 -> Sqs.createFlowableWithS3(sqs, s3Factory, sqsFactory, queue, bucketName, s3, waitTimesSeconds, logger, prePoll, postPoll), s3 -> s3.ifPresent(Util::shutdown));
    }

    private static Flowable<SqsMessage> createFlowableWithS3(AmazonSQS sqs, Optional<Callable<AmazonS3>> s3Factory, Callable<AmazonSQS> sqsFactory, SqsQueue queue, Optional<String> bucketName, Optional<AmazonS3> s3, Optional<Flowable<Integer>> waitTimesSeconds, Consumer<? super String> logger, Runnable prePoll, Consumer<? super Optional<Throwable>> postPoll) {
        SqsMessage.Service service = new SqsMessage.Service(s3Factory, sqsFactory, s3, sqs, queue.getQueueUrl(sqs), bucketName);
        if (waitTimesSeconds.isPresent()) {
            return Sqs.createFlowablePolling(waitTimesSeconds.get(), service, logger, prePoll, postPoll);
        }
        return Sqs.createFlowableContinousLongPolling(service, logger, prePoll, postPoll);
    }

    private static Flowable<SqsMessage> createFlowablePolling(Flowable<Integer> waitTimesSeconds, SqsMessage.Service service, Consumer<? super String> logger, Runnable prePoll, Consumer<? super Optional<Throwable>> postPoll) {
        return waitTimesSeconds.flatMap(n -> Sqs.get(n, service, logger, prePoll, postPoll), 1);
    }

    private static Flowable<SqsMessage> get(int waitTimeSeconds, SqsMessage.Service service, Consumer<? super String> logger, Runnable prePoll, Consumer<? super Optional<Throwable>> postPoll) {
        return Flowable.fromCallable(() -> {
            logger.accept("polling for messages");
            return Sqs.sqsMessages(service, waitTimeSeconds, prePoll, postPoll);
        }).concatWith((Publisher)Flowable.fromCallable(() -> Sqs.sqsMessages(service, 0, prePoll, postPoll)).repeat()).takeWhile(x -> !x.isEmpty()).flatMapIterable(x -> x).filter(Optional::isPresent).map(Optional::get);
    }

    private static List<Message> messages(SqsMessage.Service service, int waitTimeSeconds, Runnable prePoll, Consumer<? super Optional<Throwable>> postPoll) {
        return Sqs.messages(() -> service.sqs.receiveMessage(Sqs.request(service.queueUrl, waitTimeSeconds)).getMessages(), prePoll, postPoll);
    }

    @VisibleForTesting
    static List<Message> messages(Supplier<? extends List<Message>> supplier, Runnable prePoll, Consumer<? super Optional<Throwable>> postPoll) {
        List<Message> list;
        prePoll.run();
        try {
            list = supplier.get();
        }
        catch (Throwable t) {
            postPoll.accept(Optional.of(t));
            throw t;
        }
        postPoll.accept(Optional.empty());
        return list;
    }

    private static List<Optional<SqsMessage>> sqsMessages(SqsMessage.Service service, int waitTimeSeconds, Runnable prePoll, Consumer<? super Optional<Throwable>> postPoll) {
        return Sqs.messages(service, waitTimeSeconds, prePoll, postPoll).stream().map(m -> Sqs.getNextMessage(m, service)).collect(Collectors.toList());
    }

    private static Flowable<SqsMessage> createFlowableContinousLongPolling(SqsMessage.Service service, Consumer<? super String> logger, Runnable prePoll, Consumer<? super Optional<Throwable>> postPoll) {
        ContinuousLongPollingSyncOnSubscribe c = new ContinuousLongPollingSyncOnSubscribe(service, logger, prePoll, postPoll);
        return Flowable.generate((Callable)c, (BiConsumer)c);
    }

    static Optional<SqsMessage> getNextMessage(Message message, SqsMessage.Service service) {
        if (service.bucketName.isPresent()) {
            String s3Id = message.getBody();
            if (!service.s3.get().doesObjectExist(service.bucketName.get(), s3Id)) {
                service.sqs.deleteMessage(service.queueUrl, message.getReceiptHandle());
                return Optional.empty();
            }
            S3Object object = service.s3.get().getObject(service.bucketName.get(), s3Id);
            byte[] content = Sqs.readAndClose((InputStream)object.getObjectContent());
            long timestamp = object.getObjectMetadata().getLastModified().getTime();
            SqsMessage mb = new SqsMessage(message.getReceiptHandle(), content, timestamp, Optional.of(s3Id), service);
            return Optional.of(mb);
        }
        SqsMessage mb = new SqsMessage(message.getReceiptHandle(), message.getBody().getBytes(StandardCharsets.UTF_8), System.currentTimeMillis(), Optional.empty(), service);
        return Optional.of(mb);
    }

    private static ReceiveMessageRequest request(String queueUrl, int waitTimeSeconds) {
        return new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(Integer.valueOf(20)).withWaitTimeSeconds(Integer.valueOf(waitTimeSeconds));
    }

    @VisibleForTesting
    static byte[] readAndClose(InputStream is) {
        Preconditions.checkNotNull((Object)is);
        try {
            int n;
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            byte[] bytes = new byte[8192];
            while ((n = is.read(bytes)) != -1) {
                bos.write(bytes, 0, n);
            }
            return bos.toByteArray();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private static final class State {
        final Queue<Message> queue;

        State(Queue<Message> queue) {
            this.queue = queue;
        }
    }

    private static final class ContinuousLongPollingSyncOnSubscribe
    implements Callable<State>,
    BiConsumer<State, Emitter<SqsMessage>> {
        private final SqsMessage.Service service;
        private ReceiveMessageRequest request;
        private Consumer<? super String> logger;
        private Runnable prePoll;
        private Consumer<? super Optional<Throwable>> postPoll;

        ContinuousLongPollingSyncOnSubscribe(SqsMessage.Service service, Consumer<? super String> logger, Runnable prePoll, Consumer<? super Optional<Throwable>> postPoll) {
            this.service = service;
            this.logger = logger;
            this.prePoll = prePoll;
            this.postPoll = postPoll;
        }

        @Override
        public State call() {
            this.request = new ReceiveMessageRequest(this.service.queueUrl).withWaitTimeSeconds(Integer.valueOf(20)).withMaxNumberOfMessages(Integer.valueOf(10));
            return new State(new LinkedList<Message>());
        }

        public void accept(State state, Emitter<SqsMessage> emitter) throws Exception {
            Queue<Message> q = state.queue;
            Optional<Object> next = Optional.empty();
            while (!next.isPresent()) {
                while (q.isEmpty()) {
                    this.logger.accept("long polling for messages on queue=" + this.service.queueUrl);
                    List<Message> list = Sqs.messages(() -> this.service.sqs.receiveMessage(this.request).getMessages(), this.prePoll, this.postPoll);
                    q.addAll(list);
                }
                Message message = q.poll();
                next = Sqs.getNextMessage(message, this.service);
            }
            emitter.onNext(next.get());
        }
    }

    public static final class ViaS3Builder {
        private final SqsBuilder sqsBuilder;

        public ViaS3Builder(SqsBuilder sqsBuilder) {
            this.sqsBuilder = sqsBuilder;
        }

        public SqsBuilder s3Factory(Callable<AmazonS3> s3Factory) {
            this.sqsBuilder.s3 = Optional.of(s3Factory);
            return this.sqsBuilder;
        }
    }

    public static final class SqsBuilder {
        private final SqsQueue queue;
        private Callable<AmazonSQS> sqs = null;
        private Optional<Callable<AmazonS3>> s3 = Optional.empty();
        private Optional<String> bucketName = Optional.empty();
        private Optional<Flowable<Integer>> waitTimesSeconds = Optional.empty();
        private Consumer<? super String> logger = x -> {};
        private Runnable prePoll = () -> {};
        private Consumer<? super Optional<Throwable>> postPoll = x -> {};

        SqsBuilder(SqsQueue queue) {
            Preconditions.checkNotNull((Object)queue);
            this.queue = queue;
        }

        public ViaS3Builder bucketName(String bucketName) {
            this.bucketName = Optional.of(bucketName);
            return new ViaS3Builder(this);
        }

        public SqsBuilder sqsFactory(Callable<AmazonSQS> sqsFactory) {
            this.sqs = sqsFactory;
            return this;
        }

        public SqsBuilder waitTimes(Flowable<? extends Number> waitTimesSeconds, TimeUnit unit) {
            this.waitTimesSeconds = Optional.of(waitTimesSeconds.map(x -> (int)unit.toSeconds(Math.round(x.doubleValue()))));
            return this;
        }

        public SqsBuilder interval(int interval, TimeUnit unit, Scheduler scheduler) {
            return this.waitTimes((Flowable<? extends Number>)Flowable.just((Object)0).concatWith((Publisher)Flowable.interval((long)interval, (TimeUnit)unit, (Scheduler)scheduler).map(x -> 0)), TimeUnit.SECONDS);
        }

        public SqsBuilder interval(int interval, TimeUnit unit) {
            return this.interval(interval, unit, Schedulers.io());
        }

        public SqsBuilder logger(Consumer<? super String> logger) {
            this.logger = logger;
            return this;
        }

        public SqsBuilder prePoll(Runnable prePoll) {
            this.prePoll = prePoll;
            return this;
        }

        public SqsBuilder postPoll(Consumer<? super Optional<Throwable>> postPoll) {
            this.postPoll = postPoll;
            return this;
        }

        public Flowable<SqsMessage> messages() {
            return Sqs.messages(this.sqs, this.s3, this.queue, this.bucketName, this.waitTimesSeconds, this.logger, this.prePoll, this.postPoll);
        }
    }

    public static final class BuilderWithOwnerAccountId {
        private final String ownerAccountId;

        BuilderWithOwnerAccountId(String ownerAccountId) {
            this.ownerAccountId = ownerAccountId;
        }

        SqsBuilder queueName(String queueName) {
            return new SqsBuilder(SqsQueue.fromQueueNameAndOwnerAccountId(queueName, this.ownerAccountId));
        }
    }
}

