/*
 * Decompiled with CFR 0.152.
 */
package com.solace.connector.beam;

import com.google.common.annotations.VisibleForTesting;
import com.solace.connector.beam.MsgBusSempUtil;
import com.solace.connector.beam.SolaceCheckpointMark;
import com.solace.connector.beam.SolaceReaderStats;
import com.solace.connector.beam.UnboundedSolaceSource;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.ConsumerFlowProperties;
import com.solacesystems.jcsmp.Endpoint;
import com.solacesystems.jcsmp.EndpointProperties;
import com.solacesystems.jcsmp.FlowReceiver;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.Queue;
import java.io.IOException;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.sdk.io.UnboundedSource;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@VisibleForTesting
class UnboundedSolaceReader<T>
extends UnboundedSource.UnboundedReader<T> {
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSolaceReader.class);
    AtomicBoolean active = new AtomicBoolean(true);
    private final UnboundedSolaceSource<T> source;
    private JCSMPSession session;
    protected FlowReceiver flowReceiver;
    private MsgBusSempUtil msgBusSempUtil;
    private boolean useSenderTimestamp;
    private String clientName;
    private T current;
    private Instant currentTimestamp;
    private final EndpointProperties endpointProps = new EndpointProperties();
    final SolaceReaderStats readerStats;
    private ConsumerFlowProperties flow_prop = new ConsumerFlowProperties();
    AtomicLong watermark = new AtomicLong(0L);
    private AtomicBoolean isActive = new AtomicBoolean(true);
    private AtomicBoolean endMonitor = new AtomicBoolean(false);
    private ActivityMonitor<T> activityMonitor;
    private static final long statsPeriodMs = 120000L;
    private final java.util.Queue<Message> wait4cpQueue = new LinkedList<Message>();

    public UnboundedSolaceReader(UnboundedSolaceSource<T> source) {
        this.source = source;
        this.current = null;
        this.watermark.getAndSet(System.currentTimeMillis());
        this.readerStats = new SolaceReaderStats();
    }

    public void setUp() throws IOException {
        try {
            if (this.msgBusSempUtil == null) {
                JCSMPProperties properties = this.source.getSpec().jcsmpProperties();
                this.session = JCSMPFactory.onlyInstance().createSession(properties);
                this.clientName = (String)this.session.getProperty("client_name");
                this.session.connect();
                this.msgBusSempUtil = new MsgBusSempUtil(this.session);
                this.msgBusSempUtil.start();
            }
        }
        catch (Exception ex) {
            String msg = String.format("Failed to start UnboundSolaceReader for Solace session %s for queue: %s", this.clientName, this.source.getQueueName());
            LOG.error(msg, (Throwable)ex);
            this.session.closeSession();
            throw new IOException(msg, ex);
        }
    }

    public boolean start() throws IOException {
        this.setUp();
        try {
            if (this.flowReceiver == null) {
                Queue queue = JCSMPFactory.onlyInstance().createQueue(this.source.getQueueName());
                this.flow_prop.setEndpoint((Endpoint)queue);
                this.flow_prop.setAckMode("client_ack");
                this.useSenderTimestamp = this.source.getSpec().useSenderTimestamp();
                this.readerStats.setLastReportTime(Instant.now());
                this.flowReceiver = this.session.createFlow(null, this.flow_prop, this.endpointProps);
                this.flowReceiver.start();
                LOG.info("Binding Solace session [{}] to queue[{}]...", (Object)this.clientName, (Object)this.source.getQueueName());
            }
            if (this.activityMonitor == null) {
                this.activityMonitor = new ActivityMonitor(this, this.source.getSpec().advanceTimeoutInMillis());
                this.activityMonitor.start();
            }
            return this.advance();
        }
        catch (Exception ex) {
            String msg = String.format("Failed to start UnboundSolaceReader for Solace session %s for queue: %s", this.clientName, this.source.getQueueName());
            LOG.error(msg, (Throwable)ex);
            throw new IOException(msg, ex);
        }
    }

    public boolean advance() throws IOException {
        LOG.trace("Advancing Solace session [{}] on queue [{}]...", (Object)this.clientName, (Object)this.source.getQueueName());
        Instant timeNow = Instant.now();
        this.isActive.set(true);
        this.readerStats.setCurrentAdvanceTime(timeNow);
        long deltaTime = timeNow.getMillis() - this.readerStats.getLastReportTime().getMillis();
        if (deltaTime >= 120000L) {
            LOG.info("Stats for Queue [{}] : {} from client [{}]", new Object[]{this.source.getQueueName(), this.readerStats.dumpStatsAndClear(true), this.clientName});
            this.readerStats.setLastReportTime(timeNow);
        }
        try {
            Long solaceTime;
            BytesXMLMessage msg = this.flowReceiver.receive(this.source.getSpec().advanceTimeoutInMillis());
            if (msg == null) {
                this.readerStats.incrementEmptyPoll();
                return false;
            }
            this.readerStats.incrementMessageReceived();
            this.current = this.source.getSpec().inboundMessageMapper().map(msg);
            this.currentTimestamp = this.useSenderTimestamp ? ((solaceTime = msg.getSenderTimestamp()) == null ? Instant.now() : new Instant(solaceTime.longValue())) : Instant.now();
            this.wait4cpQueue.add(new Message(msg, this.currentTimestamp));
        }
        catch (JCSMPException ex) {
            try {
                LOG.info("JCSMPException for from client [{}] : {}", (Object)this.clientName, (Object)ex.getMessage());
                this.flowReceiver.close();
                this.readerStats.incrementMessagesRemovedFromCheckpointQueue(this.wait4cpQueue.size());
                this.wait4cpQueue.clear();
                this.flowReceiver = this.session.createFlow(null, this.flow_prop, this.endpointProps);
                this.flowReceiver.start();
                this.readerStats.incrementPollFlowRebind();
            }
            catch (JCSMPException restartEx) {
                LOG.error("Unrecoverable JCSMPException for from client [{}] : {}", (Object)this.clientName, (Object)ex.getMessage());
                throw new IOException(restartEx);
            }
            return false;
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
        return true;
    }

    public void close() throws IOException {
        LOG.info("Close the Solace session [{}] on queue[{}]...", (Object)this.clientName, (Object)this.source.getQueueName());
        this.active.set(false);
        try {
            if (this.flowReceiver != null) {
                this.flowReceiver.close();
            }
            if (this.msgBusSempUtil != null) {
                this.msgBusSempUtil.close();
            }
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
        finally {
            if (this.session != null && !this.session.isClosed()) {
                this.session.closeSession();
            }
        }
    }

    public Instant getWatermark() {
        if (this.watermark == null) {
            return new Instant(0L);
        }
        return new Instant(this.watermark.get());
    }

    public UnboundedSource.CheckpointMark getCheckpointMark() {
        LinkedBlockingQueue<Message> ackQueue = new LinkedBlockingQueue<Message>();
        try {
            Message msg = this.wait4cpQueue.poll();
            while (msg != null) {
                ackQueue.put(msg);
                msg = this.wait4cpQueue.poll();
            }
        }
        catch (Exception e) {
            LOG.error("Got exception while putting into the blocking queue", (Throwable)e);
        }
        this.readerStats.setCurrentCheckpointTime(Instant.now());
        this.readerStats.incrCheckpointReadyMessages(Long.valueOf(ackQueue.size()));
        return new SolaceCheckpointMark(this, this.clientName, ackQueue);
    }

    public T getCurrent() {
        if (this.current == null) {
            throw new NoSuchElementException();
        }
        return this.current;
    }

    public Instant getCurrentTimestamp() {
        if (this.current == null) {
            throw new NoSuchElementException();
        }
        return this.currentTimestamp;
    }

    public UnboundedSolaceSource<T> getCurrentSource() {
        return this.source;
    }

    private long queryQueueBytes(String queueName, String vpnName) {
        LOG.debug("Enter queryQueueBytes() Queue: [{}] VPN: [{}]", (Object)queueName, (Object)vpnName);
        long queueBytes = -1L;
        String sempShowQueue = String.format("<rpc><show><queue><name>%s</name><vpn-name>%s</vpn-name></queue></show></rpc>", queueName, vpnName);
        String queryString = "/rpc-reply/rpc/show/queue/queues/queue/info/current-spool-usage-in-bytes";
        try {
            this.setUp();
            String queryResults = this.msgBusSempUtil.queryRouter(sempShowQueue, queryString);
            queueBytes = Long.parseLong(queryResults);
        }
        catch (JCSMPException e) {
            LOG.error("Encountered a JCSMPException querying queue depth", (Throwable)e);
            return -1L;
        }
        catch (Exception e) {
            LOG.error("Encountered a Parser Exception querying queue depth", (Throwable)e);
            return -1L;
        }
        return queueBytes;
    }

    public long getSplitBacklogBytes() {
        LOG.debug("Enter getSplitBacklogBytes()");
        long backlogBytes = this.queryQueueBytes(this.source.getQueueName(), this.source.getSpec().jcsmpProperties().getStringProperty("vpn_name"));
        if (backlogBytes == -1L) {
            LOG.error("getSplitBacklogBytes() unable to read bytes from: {}", (Object)this.source.getQueueName());
            return -1L;
        }
        this.readerStats.setCurrentBacklog(backlogBytes);
        LOG.debug("getSplitBacklogBytes() Reporting backlog bytes of: {} from queue {}", (Object)Long.toString(backlogBytes), (Object)this.source.getQueueName());
        return backlogBytes;
    }

    String getClientName() {
        return this.clientName;
    }

    protected void finalize() throws Throwable {
        this.endMonitor.set(true);
        if (this.session != null && !this.session.isClosed()) {
            this.session.closeSession();
        }
    }

    static class Message
    implements Serializable {
        private static final long serialVersionUID = 42L;
        BytesXMLMessage message;
        Instant time;

        public Message(BytesXMLMessage message, Instant time) {
            this.message = message;
            this.time = time;
        }
    }

    private static class ActivityMonitor<T>
    extends Thread {
        private UnboundedSolaceReader<T> reader;
        private int timeout;
        private static final int debounce = 300;

        protected ActivityMonitor(UnboundedSolaceReader<T> reader, int timeout) {
            this.reader = reader;
            this.timeout = timeout;
        }

        @Override
        public void run() {
            while (!((UnboundedSolaceReader)this.reader).endMonitor.get()) {
                try {
                    this.reader.readerStats.incrementMonitorChecks();
                    Thread.sleep(this.timeout * 300);
                    if (!((UnboundedSolaceReader)this.reader).isActive.get()) {
                        this.reader.flowReceiver.close();
                        this.reader.readerStats.incrementMonitorFlowClose();
                    }
                    ((UnboundedSolaceReader)this.reader).isActive.set(false);
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                    throw new RuntimeException(ex);
                }
            }
        }
    }
}

