/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.jms;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.jms.AutoScaler;
import org.apache.beam.sdk.io.jms.AutoValue_JmsIO_Read;
import org.apache.beam.sdk.io.jms.AutoValue_JmsIO_Write;
import org.apache.beam.sdk.io.jms.DefaultAutoscaler;
import org.apache.beam.sdk.io.jms.JmsCheckpointMark;
import org.apache.beam.sdk.io.jms.JmsRecord;
import org.apache.beam.sdk.io.jms.WriteJmsResult;
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class JmsIO {
    private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);
    private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.millis((long)60000L);

    public static Read<JmsRecord> read() {
        return new AutoValue_JmsIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).setCoder(SerializableCoder.of(JmsRecord.class)).setCloseTimeout(DEFAULT_CLOSE_TIMEOUT).setMessageMapper(new MessageMapper<JmsRecord>(){

            @Override
            public JmsRecord mapMessage(Message message) throws Exception {
                TextMessage textMessage = (TextMessage)message;
                HashMap<String, Object> properties = new HashMap<String, Object>();
                Enumeration propertyNames = textMessage.getPropertyNames();
                while (propertyNames.hasMoreElements()) {
                    String propertyName = (String)propertyNames.nextElement();
                    properties.put(propertyName, textMessage.getObjectProperty(propertyName));
                }
                return new JmsRecord(textMessage.getJMSMessageID(), textMessage.getJMSTimestamp(), textMessage.getJMSCorrelationID(), textMessage.getJMSReplyTo(), textMessage.getJMSDestination(), textMessage.getJMSDeliveryMode(), textMessage.getJMSRedelivered(), textMessage.getJMSType(), textMessage.getJMSExpiration(), textMessage.getJMSPriority(), properties, textMessage.getText());
            }
        }).build();
    }

    public static <T> Read<T> readMessage() {
        return new AutoValue_JmsIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).setCloseTimeout(DEFAULT_CLOSE_TIMEOUT).build();
    }

    public static <EventT> Write<EventT> write() {
        return new AutoValue_JmsIO_Write.Builder().build();
    }

    private JmsIO() {
    }

    @AutoValue
    public static abstract class Write<EventT>
    extends PTransform<PCollection<EventT>, WriteJmsResult<EventT>> {
        abstract @Nullable ConnectionFactory getConnectionFactory();

        abstract @Nullable String getQueue();

        abstract @Nullable String getTopic();

        abstract @Nullable String getUsername();

        abstract @Nullable String getPassword();

        abstract @Nullable SerializableBiFunction<EventT, Session, Message> getValueMapper();

        abstract @Nullable SerializableFunction<EventT, String> getTopicNameMapper();

        abstract Builder<EventT> builder();

        public Write<EventT> withConnectionFactory(ConnectionFactory connectionFactory) {
            Preconditions.checkArgument((connectionFactory != null ? 1 : 0) != 0, (Object)"connectionFactory can not be null");
            return this.builder().setConnectionFactory(connectionFactory).build();
        }

        public Write<EventT> withQueue(String queue) {
            Preconditions.checkArgument((queue != null ? 1 : 0) != 0, (Object)"queue can not be null");
            return this.builder().setQueue(queue).build();
        }

        public Write<EventT> withTopic(String topic) {
            Preconditions.checkArgument((topic != null ? 1 : 0) != 0, (Object)"topic can not be null");
            return this.builder().setTopic(topic).build();
        }

        public Write<EventT> withUsername(String username) {
            Preconditions.checkArgument((username != null ? 1 : 0) != 0, (Object)"username can not be null");
            return this.builder().setUsername(username).build();
        }

        public Write<EventT> withPassword(String password) {
            Preconditions.checkArgument((password != null ? 1 : 0) != 0, (Object)"password can not be null");
            return this.builder().setPassword(password).build();
        }

        public Write<EventT> withTopicNameMapper(SerializableFunction<EventT, String> topicNameMapper) {
            Preconditions.checkArgument((topicNameMapper != null ? 1 : 0) != 0, (Object)"topicNameMapper can not be null");
            return this.builder().setTopicNameMapper(topicNameMapper).build();
        }

        public Write<EventT> withValueMapper(SerializableBiFunction<EventT, Session, Message> valueMapper) {
            Preconditions.checkArgument((valueMapper != null ? 1 : 0) != 0, (Object)"valueMapper can not be null");
            return this.builder().setValueMapper(valueMapper).build();
        }

        public WriteJmsResult<EventT> expand(PCollection<EventT> input) {
            Preconditions.checkArgument((this.getConnectionFactory() != null ? 1 : 0) != 0, (Object)"withConnectionFactory() is required");
            Preconditions.checkArgument((this.getTopicNameMapper() != null || this.getQueue() != null || this.getTopic() != null ? 1 : 0) != 0, (Object)"Either withTopicNameMapper(topicNameMapper), withQueue(queue), or withTopic(topic) is required");
            boolean exclusiveTopicQueue = this.isExclusiveTopicQueue();
            Preconditions.checkArgument((boolean)exclusiveTopicQueue, (Object)"Only one of withQueue(queue), withTopic(topic), or withTopicNameMapper(function) must be set.");
            Preconditions.checkArgument((this.getValueMapper() != null ? 1 : 0) != 0, (Object)"withValueMapper() is required");
            TupleTag failedMessagesTag = new TupleTag();
            TupleTag messagesTag = new TupleTag();
            PCollectionTuple res = (PCollectionTuple)input.apply((PTransform)ParDo.of(new WriterFn(this, failedMessagesTag)).withOutputTags(messagesTag, TupleTagList.of((TupleTag)failedMessagesTag)));
            PCollection failedMessages = res.get(failedMessagesTag).setCoder(input.getCoder());
            res.get(messagesTag).setCoder(input.getCoder());
            return WriteJmsResult.in(input.getPipeline(), failedMessagesTag, failedMessages);
        }

        private boolean isExclusiveTopicQueue() {
            boolean exclusiveTopicQueue = Stream.of(this.getQueue() != null, this.getTopic() != null, this.getTopicNameMapper() != null).filter(b -> b).count() == 1L;
            return exclusiveTopicQueue;
        }

        private static class WriterFn<EventT>
        extends DoFn<EventT, EventT> {
            private Write<EventT> spec;
            private Connection connection;
            private Session session;
            private MessageProducer producer;
            private Destination destination;
            private final TupleTag<EventT> failedMessageTag;

            public WriterFn(Write<EventT> spec, TupleTag<EventT> failedMessageTag) {
                this.spec = spec;
                this.failedMessageTag = failedMessageTag;
            }

            @DoFn.Setup
            public void setup() throws Exception {
                if (this.producer == null) {
                    this.connection = this.spec.getUsername() != null ? this.spec.getConnectionFactory().createConnection(this.spec.getUsername(), this.spec.getPassword()) : this.spec.getConnectionFactory().createConnection();
                    this.connection.start();
                    this.session = this.connection.createSession(false, 1);
                    if (this.spec.getQueue() != null) {
                        this.destination = this.session.createQueue(this.spec.getQueue());
                    } else if (this.spec.getTopic() != null) {
                        this.destination = this.session.createTopic(this.spec.getTopic());
                    }
                    this.producer = this.session.createProducer(null);
                }
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext ctx) {
                Destination destinationToSendTo = this.destination;
                try {
                    Message message = (Message)this.spec.getValueMapper().apply(ctx.element(), (Object)this.session);
                    if (this.spec.getTopicNameMapper() != null) {
                        destinationToSendTo = this.session.createTopic((String)this.spec.getTopicNameMapper().apply(ctx.element()));
                    }
                    this.producer.send(destinationToSendTo, message);
                }
                catch (Exception ex) {
                    LOG.error("Error sending message on topic {}", (Object)destinationToSendTo);
                    ctx.output(this.failedMessageTag, ctx.element());
                }
            }

            @DoFn.Teardown
            public void teardown() throws Exception {
                this.producer.close();
                this.producer = null;
                this.session.close();
                this.session = null;
                this.connection.stop();
                this.connection.close();
                this.connection = null;
            }
        }

        @AutoValue.Builder
        static abstract class Builder<EventT> {
            Builder() {
            }

            abstract Builder<EventT> setConnectionFactory(ConnectionFactory var1);

            abstract Builder<EventT> setQueue(String var1);

            abstract Builder<EventT> setTopic(String var1);

            abstract Builder<EventT> setUsername(String var1);

            abstract Builder<EventT> setPassword(String var1);

            abstract Builder<EventT> setValueMapper(SerializableBiFunction<EventT, Session, Message> var1);

            abstract Builder<EventT> setTopicNameMapper(SerializableFunction<EventT, String> var1);

            abstract Write<EventT> build();
        }
    }

    static class UnboundedJmsReader<T>
    extends UnboundedSource.UnboundedReader<T> {
        private UnboundedJmsSource<T> source;
        private JmsCheckpointMark checkpointMark;
        private Connection connection;
        private Session session;
        private MessageConsumer consumer;
        private AutoScaler autoScaler;
        private T currentMessage;
        private Instant currentTimestamp;
        private PipelineOptions options;

        public UnboundedJmsReader(UnboundedJmsSource<T> source, PipelineOptions options) {
            this.source = source;
            this.checkpointMark = new JmsCheckpointMark();
            this.currentMessage = null;
            this.options = options;
        }

        public boolean start() throws IOException {
            Read spec = ((UnboundedJmsSource)this.source).spec;
            ConnectionFactory connectionFactory = spec.getConnectionFactory();
            try {
                Connection connection = spec.getUsername() != null ? connectionFactory.createConnection(spec.getUsername(), spec.getPassword()) : connectionFactory.createConnection();
                connection.start();
                this.connection = connection;
                this.autoScaler = spec.getAutoScaler() == null ? new DefaultAutoscaler() : spec.getAutoScaler();
                this.autoScaler.start();
            }
            catch (Exception e) {
                throw new IOException("Error connecting to JMS", e);
            }
            try {
                this.session = this.connection.createSession(false, 2);
            }
            catch (Exception e) {
                throw new IOException("Error creating JMS session", e);
            }
            try {
                this.consumer = spec.getTopic() != null ? this.session.createConsumer((Destination)this.session.createTopic(spec.getTopic())) : this.session.createConsumer((Destination)this.session.createQueue(spec.getQueue()));
            }
            catch (Exception e) {
                throw new IOException("Error creating JMS consumer", e);
            }
            return this.advance();
        }

        public boolean advance() throws IOException {
            try {
                Message message = this.consumer.receiveNoWait();
                if (message == null) {
                    this.currentMessage = null;
                    return false;
                }
                this.checkpointMark.add(message);
                this.currentMessage = ((UnboundedJmsSource)this.source).spec.getMessageMapper().mapMessage(message);
                this.currentTimestamp = new Instant(message.getJMSTimestamp());
                return true;
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }

        public T getCurrent() throws NoSuchElementException {
            if (this.currentMessage == null) {
                throw new NoSuchElementException();
            }
            return this.currentMessage;
        }

        public Instant getWatermark() {
            return this.checkpointMark.getOldestMessageTimestamp();
        }

        public Instant getCurrentTimestamp() {
            if (this.currentMessage == null) {
                throw new NoSuchElementException();
            }
            return this.currentTimestamp;
        }

        public UnboundedSource.CheckpointMark getCheckpointMark() {
            return this.checkpointMark;
        }

        public long getTotalBacklogBytes() {
            return this.autoScaler.getTotalBacklogBytes();
        }

        public UnboundedSource<T, ?> getCurrentSource() {
            return this.source;
        }

        public void close() {
            this.doClose();
        }

        private void doClose() {
            try {
                this.closeAutoscaler();
                this.closeConsumer();
                ScheduledExecutorService executorService = ((ExecutorOptions)this.options.as(ExecutorOptions.class)).getScheduledExecutorService();
                executorService.schedule(() -> {
                    LOG.debug("Closing session and connection after delay {}", (Object)((UnboundedJmsSource)this.source).spec.getCloseTimeout());
                    this.checkpointMark.discard();
                    this.closeSession();
                    this.closeConnection();
                }, ((UnboundedJmsSource)this.source).spec.getCloseTimeout().getMillis(), TimeUnit.MILLISECONDS);
            }
            catch (Exception e) {
                LOG.error("Error closing reader", (Throwable)e);
            }
        }

        private void closeConnection() {
            try {
                if (this.connection != null) {
                    this.connection.stop();
                    this.connection.close();
                    this.connection = null;
                }
            }
            catch (Exception e) {
                LOG.error("Error closing connection", (Throwable)e);
            }
        }

        private void closeSession() {
            try {
                if (this.session != null) {
                    this.session.close();
                    this.session = null;
                }
            }
            catch (Exception e) {
                LOG.error("Error closing session" + e.getMessage(), (Throwable)e);
            }
        }

        private void closeConsumer() {
            try {
                if (this.consumer != null) {
                    this.consumer.close();
                    this.consumer = null;
                }
            }
            catch (Exception e) {
                LOG.error("Error closing consumer", (Throwable)e);
            }
        }

        private void closeAutoscaler() {
            try {
                if (this.autoScaler != null) {
                    this.autoScaler.stop();
                    this.autoScaler = null;
                }
            }
            catch (Exception e) {
                LOG.error("Error closing autoscaler", (Throwable)e);
            }
        }

        protected void finalize() {
            this.doClose();
        }
    }

    static class UnboundedJmsSource<T>
    extends UnboundedSource<T, JmsCheckpointMark> {
        private final Read<T> spec;

        public UnboundedJmsSource(Read<T> spec) {
            this.spec = spec;
        }

        public List<UnboundedJmsSource<T>> split(int desiredNumSplits, PipelineOptions options) throws Exception {
            ArrayList<UnboundedJmsSource<T>> sources = new ArrayList<UnboundedJmsSource<T>>();
            if (this.spec.getTopic() != null) {
                sources.add(new UnboundedJmsSource<T>(this.spec));
            } else {
                for (int i = 0; i < desiredNumSplits; ++i) {
                    sources.add(new UnboundedJmsSource<T>(this.spec));
                }
            }
            return sources;
        }

        public UnboundedJmsReader<T> createReader(PipelineOptions options, JmsCheckpointMark checkpointMark) {
            return new UnboundedJmsReader(this, options);
        }

        public Coder<JmsCheckpointMark> getCheckpointMarkCoder() {
            return SerializableCoder.of(JmsCheckpointMark.class);
        }

        public Coder<T> getOutputCoder() {
            return this.spec.getCoder();
        }
    }

    @FunctionalInterface
    public static interface MessageMapper<T>
    extends Serializable {
        public T mapMessage(Message var1) throws Exception;
    }

    @AutoValue
    public static abstract class Read<T>
    extends PTransform<PBegin, PCollection<T>> {
        abstract @Nullable ConnectionFactory getConnectionFactory();

        abstract @Nullable String getQueue();

        abstract @Nullable String getTopic();

        abstract @Nullable String getUsername();

        abstract @Nullable String getPassword();

        abstract long getMaxNumRecords();

        abstract @Nullable Duration getMaxReadTime();

        abstract @Nullable MessageMapper<T> getMessageMapper();

        abstract @Nullable Coder<T> getCoder();

        abstract @Nullable AutoScaler getAutoScaler();

        abstract Duration getCloseTimeout();

        abstract Builder<T> builder();

        public Read<T> withConnectionFactory(ConnectionFactory connectionFactory) {
            Preconditions.checkArgument((connectionFactory != null ? 1 : 0) != 0, (Object)"connectionFactory can not be null");
            return this.builder().setConnectionFactory(connectionFactory).build();
        }

        public Read<T> withQueue(String queue) {
            Preconditions.checkArgument((queue != null ? 1 : 0) != 0, (Object)"queue can not be null");
            return this.builder().setQueue(queue).build();
        }

        public Read<T> withTopic(String topic) {
            Preconditions.checkArgument((topic != null ? 1 : 0) != 0, (Object)"topic can not be null");
            return this.builder().setTopic(topic).build();
        }

        public Read<T> withUsername(String username) {
            Preconditions.checkArgument((username != null ? 1 : 0) != 0, (Object)"username can not be null");
            return this.builder().setUsername(username).build();
        }

        public Read<T> withPassword(String password) {
            Preconditions.checkArgument((password != null ? 1 : 0) != 0, (Object)"password can not be null");
            return this.builder().setPassword(password).build();
        }

        public Read<T> withMaxNumRecords(long maxNumRecords) {
            Preconditions.checkArgument((maxNumRecords >= 0L ? 1 : 0) != 0, (String)"maxNumRecords must be > 0, but was: %s", (long)maxNumRecords);
            return this.builder().setMaxNumRecords(maxNumRecords).build();
        }

        public Read<T> withMaxReadTime(Duration maxReadTime) {
            Preconditions.checkArgument((maxReadTime != null ? 1 : 0) != 0, (Object)"maxReadTime can not be null");
            return this.builder().setMaxReadTime(maxReadTime).build();
        }

        public Read<T> withMessageMapper(MessageMapper<T> messageMapper) {
            Preconditions.checkArgument((messageMapper != null ? 1 : 0) != 0, (Object)"messageMapper can not be null");
            return this.builder().setMessageMapper(messageMapper).build();
        }

        public Read<T> withCoder(Coder<T> coder) {
            Preconditions.checkArgument((coder != null ? 1 : 0) != 0, (Object)"coder can not be null");
            return this.builder().setCoder(coder).build();
        }

        public Read<T> withAutoScaler(AutoScaler autoScaler) {
            Preconditions.checkArgument((autoScaler != null ? 1 : 0) != 0, (Object)"autoScaler can not be null");
            return this.builder().setAutoScaler(autoScaler).build();
        }

        public Read<T> withCloseTimeout(Duration closeTimeout) {
            Preconditions.checkArgument((closeTimeout != null ? 1 : 0) != 0, (Object)"closeTimeout can not be null");
            Preconditions.checkArgument((closeTimeout.getMillis() >= 0L ? 1 : 0) != 0, (Object)"Close timeout must be non-negative.");
            return this.builder().setCloseTimeout(closeTimeout).build();
        }

        public PCollection<T> expand(PBegin input) {
            Read.Unbounded unbounded;
            Preconditions.checkArgument((this.getConnectionFactory() != null ? 1 : 0) != 0, (Object)"withConnectionFactory() is required");
            Preconditions.checkArgument((this.getQueue() != null || this.getTopic() != null ? 1 : 0) != 0, (Object)"Either withQueue() or withTopic() is required");
            Preconditions.checkArgument((this.getQueue() == null || this.getTopic() == null ? 1 : 0) != 0, (Object)"withQueue() and withTopic() are exclusive");
            Preconditions.checkArgument((this.getMessageMapper() != null ? 1 : 0) != 0, (Object)"withMessageMapper() is required");
            Preconditions.checkArgument((this.getCoder() != null ? 1 : 0) != 0, (Object)"withCoder() is required");
            Read.Unbounded transform = unbounded = org.apache.beam.sdk.io.Read.from(this.createSource());
            if (this.getMaxNumRecords() < Long.MAX_VALUE || this.getMaxReadTime() != null) {
                transform = unbounded.withMaxReadTime(this.getMaxReadTime()).withMaxNumRecords(this.getMaxNumRecords());
            }
            return (PCollection)input.getPipeline().apply((PTransform)transform);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item((String)"queue", (String)this.getQueue()));
            builder.addIfNotNull(DisplayData.item((String)"topic", (String)this.getTopic()));
        }

        UnboundedSource<T, JmsCheckpointMark> createSource() {
            return new UnboundedJmsSource(this);
        }

        @AutoValue.Builder
        static abstract class Builder<T> {
            Builder() {
            }

            abstract Builder<T> setConnectionFactory(ConnectionFactory var1);

            abstract Builder<T> setQueue(String var1);

            abstract Builder<T> setTopic(String var1);

            abstract Builder<T> setUsername(String var1);

            abstract Builder<T> setPassword(String var1);

            abstract Builder<T> setMaxNumRecords(long var1);

            abstract Builder<T> setMaxReadTime(Duration var1);

            abstract Builder<T> setMessageMapper(MessageMapper<T> var1);

            abstract Builder<T> setCoder(Coder<T> var1);

            abstract Builder<T> setAutoScaler(AutoScaler var1);

            abstract Builder<T> setCloseTimeout(Duration var1);

            abstract Read<T> build();
        }
    }
}

