/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.jms;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.jms.JmsCheckpointMark;
import org.apache.beam.sdk.io.jms.JmsRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsIO {
    private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);

    public static Read read() {
        return new Read();
    }

    public static Write write() {
        return new Write();
    }

    private JmsIO() {
    }

    public static class Write
    extends PTransform<PCollection<String>, PDone> {
        protected ConnectionFactory connectionFactory;
        protected String queue;
        protected String topic;

        public Write withConnectionFactory(ConnectionFactory connectionFactory) {
            return new Write(connectionFactory, this.queue, this.topic);
        }

        public Write withQueue(String queue) {
            return new Write(this.connectionFactory, queue, this.topic);
        }

        public Write withTopic(String topic) {
            return new Write(this.connectionFactory, this.queue, topic);
        }

        private Write() {
        }

        private Write(ConnectionFactory connectionFactory, String queue, String topic) {
            this.connectionFactory = connectionFactory;
            this.queue = queue;
            this.topic = topic;
        }

        public PDone apply(PCollection<String> input) {
            input.apply((PTransform)ParDo.of((DoFn)new JmsWriter(this.connectionFactory, this.queue, this.topic)));
            return PDone.in((Pipeline)input.getPipeline());
        }

        public void validate(PCollection<String> input) {
            Preconditions.checkNotNull((Object)this.connectionFactory, (Object)"ConnectionFactory is not defined");
            Preconditions.checkArgument((this.queue != null || this.topic != null ? 1 : 0) != 0, (Object)"Either queue or topic is required");
        }

        private static class JmsWriter
        extends DoFn<String, Void> {
            private ConnectionFactory connectionFactory;
            private String queue;
            private String topic;
            private Connection connection;
            private Session session;
            private MessageProducer producer;

            public JmsWriter(ConnectionFactory connectionFactory, String queue, String topic) {
                this.connectionFactory = connectionFactory;
                this.queue = queue;
                this.topic = topic;
            }

            public void startBundle(DoFn.Context c) throws Exception {
                if (this.producer == null) {
                    this.connection = this.connectionFactory.createConnection();
                    this.connection.start();
                    this.session = this.connection.createSession(false, 1);
                    Object destination = this.queue != null ? this.session.createQueue(this.queue) : this.session.createTopic(this.topic);
                    this.producer = this.session.createProducer((Destination)destination);
                }
            }

            public void processElement(DoFn.ProcessContext ctx) throws Exception {
                String value = (String)ctx.element();
                try {
                    TextMessage message = this.session.createTextMessage(value);
                    this.producer.send((Message)message);
                }
                catch (Exception t) {
                    this.finishBundle(null);
                    throw t;
                }
            }

            public void finishBundle(DoFn.Context c) throws Exception {
                this.producer.close();
                this.producer = null;
                this.session.close();
                this.session = null;
                this.connection.stop();
                this.connection.close();
                this.connection = null;
            }
        }
    }

    private static class UnboundedJmsReader
    extends UnboundedSource.UnboundedReader<JmsRecord> {
        private UnboundedJmsSource source;
        private JmsCheckpointMark checkpointMark;
        private Connection connection;
        private Session session;
        private MessageConsumer consumer;
        private JmsRecord currentRecord;
        private Instant currentTimestamp;

        public UnboundedJmsReader(UnboundedJmsSource source, JmsCheckpointMark checkpointMark) {
            this.source = source;
            this.checkpointMark = checkpointMark != null ? checkpointMark : new JmsCheckpointMark();
            this.currentRecord = null;
        }

        public boolean start() throws IOException {
            ConnectionFactory connectionFactory = this.source.connectionFactory;
            try {
                this.connection = connectionFactory.createConnection();
                this.connection.start();
                this.session = this.connection.createSession(false, 1);
                this.consumer = this.source.topic != null ? this.session.createConsumer((Destination)this.session.createTopic(this.source.topic)) : this.session.createConsumer((Destination)this.session.createQueue(this.source.queue));
                return this.advance();
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }

        public boolean advance() throws IOException {
            try {
                TextMessage message = (TextMessage)this.consumer.receiveNoWait();
                if (message == null) {
                    this.currentRecord = null;
                    return false;
                }
                HashMap<String, Object> properties = new HashMap<String, Object>();
                Enumeration propertyNames = message.getPropertyNames();
                while (propertyNames.hasMoreElements()) {
                    String propertyName = (String)propertyNames.nextElement();
                    properties.put(propertyName, message.getObjectProperty(propertyName));
                }
                JmsRecord jmsRecord = new JmsRecord(message.getJMSMessageID(), message.getJMSTimestamp(), message.getJMSCorrelationID(), message.getJMSReplyTo(), message.getJMSDestination(), message.getJMSDeliveryMode(), message.getJMSRedelivered(), message.getJMSType(), message.getJMSExpiration(), message.getJMSPriority(), properties, message.getText());
                this.checkpointMark.addMessage((Message)message);
                this.currentRecord = jmsRecord;
                this.currentTimestamp = new Instant(message.getJMSTimestamp());
                return true;
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }

        public JmsRecord getCurrent() throws NoSuchElementException {
            if (this.currentRecord == null) {
                throw new NoSuchElementException();
            }
            return this.currentRecord;
        }

        public Instant getWatermark() {
            return this.checkpointMark.getOldestPendingTimestamp();
        }

        public Instant getCurrentTimestamp() {
            if (this.currentRecord == null) {
                throw new NoSuchElementException();
            }
            return this.currentTimestamp;
        }

        public UnboundedSource.CheckpointMark getCheckpointMark() {
            return this.checkpointMark;
        }

        public UnboundedSource<JmsRecord, ?> getCurrentSource() {
            return this.source;
        }

        public void close() throws IOException {
            try {
                if (this.consumer != null) {
                    this.consumer.close();
                    this.consumer = null;
                }
                if (this.session != null) {
                    this.session.close();
                    this.session = null;
                }
                if (this.connection != null) {
                    this.connection.stop();
                    this.connection.close();
                    this.connection = null;
                }
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }
    }

    private static class UnboundedJmsSource
    extends UnboundedSource<JmsRecord, JmsCheckpointMark> {
        private final ConnectionFactory connectionFactory;
        private final String queue;
        private final String topic;

        public UnboundedJmsSource(ConnectionFactory connectionFactory, String queue, String topic) {
            this.connectionFactory = connectionFactory;
            this.queue = queue;
            this.topic = topic;
        }

        public List<UnboundedJmsSource> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
            ArrayList<UnboundedJmsSource> sources = new ArrayList<UnboundedJmsSource>();
            for (int i = 0; i < desiredNumSplits; ++i) {
                sources.add(new UnboundedJmsSource(this.connectionFactory, this.queue, this.topic));
            }
            return sources;
        }

        public UnboundedJmsReader createReader(PipelineOptions options, JmsCheckpointMark checkpointMark) {
            return new UnboundedJmsReader(this, checkpointMark);
        }

        public void validate() {
            Preconditions.checkNotNull((Object)this.connectionFactory, (Object)"ConnectionFactory is not defined");
            Preconditions.checkArgument((this.queue != null || this.topic != null ? 1 : 0) != 0, (Object)"Either queue or topic is not defined");
        }

        public Coder getCheckpointMarkCoder() {
            return AvroCoder.of(JmsCheckpointMark.class);
        }

        public Coder<JmsRecord> getDefaultOutputCoder() {
            return SerializableCoder.of(JmsRecord.class);
        }
    }

    public static class Read
    extends PTransform<PBegin, PCollection<JmsRecord>> {
        protected ConnectionFactory connectionFactory;
        @Nullable
        protected String queue;
        @Nullable
        protected String topic;
        protected long maxNumRecords;
        protected Duration maxReadTime;

        public Read withConnectionFactory(ConnectionFactory connectionFactory) {
            return new Read(connectionFactory, this.queue, this.topic, this.maxNumRecords, this.maxReadTime);
        }

        public Read withQueue(String queue) {
            return new Read(this.connectionFactory, queue, this.topic, this.maxNumRecords, this.maxReadTime);
        }

        public Read withTopic(String topic) {
            return new Read(this.connectionFactory, this.queue, topic, this.maxNumRecords, this.maxReadTime);
        }

        public Read withMaxNumRecords(long maxNumRecords) {
            return new Read(this.connectionFactory, this.queue, this.topic, maxNumRecords, this.maxReadTime);
        }

        public Read withMaxReadTime(Duration maxReadTime) {
            return new Read(this.connectionFactory, this.queue, this.topic, this.maxNumRecords, maxReadTime);
        }

        public PCollection<JmsRecord> apply(PBegin input) {
            Read.Unbounded unbounded;
            Read.Unbounded transform = unbounded = org.apache.beam.sdk.io.Read.from(this.createSource());
            if (this.maxNumRecords != Long.MAX_VALUE) {
                transform = unbounded.withMaxNumRecords(this.maxNumRecords);
            } else if (this.maxReadTime != null) {
                transform = unbounded.withMaxReadTime(this.maxReadTime);
            }
            return (PCollection)input.getPipeline().apply((PTransform)transform);
        }

        public void validate(PBegin input) {
            Preconditions.checkNotNull((Object)this.connectionFactory, (Object)"ConnectionFactory not specified");
            Preconditions.checkArgument((this.queue != null || this.topic != null ? 1 : 0) != 0, (Object)"Either queue or topic not specified");
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item((String)"queue", (String)this.queue));
            builder.addIfNotNull(DisplayData.item((String)"topic", (String)this.topic));
        }

        private Read() {
        }

        private Read(ConnectionFactory connectionFactory, String queue, String topic, long maxNumRecords, Duration maxReadTime) {
            super("JmsIO.Read");
            this.connectionFactory = connectionFactory;
            this.queue = queue;
            this.topic = topic;
            this.maxNumRecords = maxNumRecords;
            this.maxReadTime = maxReadTime;
        }

        @VisibleForTesting
        UnboundedSource<JmsRecord, JmsCheckpointMark> createSource() {
            return new UnboundedJmsSource(this.connectionFactory, this.queue, this.topic);
        }
    }
}

