/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.jms;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import javax.jms.Message;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import org.joda.time.base.AbstractInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DefaultCoder(value=AvroCoder.class)
public class JmsCheckpointMark
implements UnboundedSource.CheckpointMark {
    private static final Logger LOG = LoggerFactory.getLogger(JmsCheckpointMark.class);
    private final State state = new State();

    protected List<Message> getMessages() {
        return this.state.getMessages();
    }

    protected void addMessage(Message message) throws Exception {
        Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
        this.state.atomicWrite(() -> {
            this.state.updateOldestPendingTimestampIf(currentMessageTimestamp, AbstractInstant::isBefore);
            this.state.addMessage(message);
        });
    }

    protected Instant getOldestPendingTimestamp() {
        return this.state.getOldestPendingTimestamp();
    }

    public void finalizeCheckpoint() {
        State snapshot = this.state.snapshot();
        for (Message message : snapshot.messages) {
            try {
                message.acknowledge();
                Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
                snapshot.updateOldestPendingTimestampIf(currentMessageTimestamp, AbstractInstant::isAfter);
            }
            catch (Exception e) {
                LOG.error("Exception while finalizing message: {}", (Throwable)e);
            }
        }
        this.state.atomicWrite(() -> {
            this.state.removeMessages(snapshot.messages);
            this.state.updateOldestPendingTimestampIf(snapshot.oldestPendingTimestamp, AbstractInstant::isAfter);
        });
    }

    private class State {
        private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        private final List<Message> messages;
        private Instant oldestPendingTimestamp;

        public State() {
            this(new ArrayList<Message>(), BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        private State(List<Message> messages, Instant oldestPendingTimestamp) {
            this.messages = messages;
            this.oldestPendingTimestamp = oldestPendingTimestamp;
        }

        public State snapshot() {
            return this.atomicRead(() -> new State(new ArrayList<Message>(this.messages), this.oldestPendingTimestamp));
        }

        public Instant getOldestPendingTimestamp() {
            return this.atomicRead(() -> this.oldestPendingTimestamp);
        }

        public List<Message> getMessages() {
            return this.atomicRead(() -> this.messages);
        }

        public void addMessage(Message message) {
            this.atomicWrite(() -> this.messages.add(message));
        }

        public void removeMessages(List<Message> messages) {
            this.atomicWrite(() -> this.messages.removeAll(messages));
        }

        private void updateOldestPendingTimestampIf(Instant candidate, BiFunction<Instant, Instant, Boolean> check) {
            this.atomicWrite(() -> {
                if (((Boolean)check.apply(candidate, this.oldestPendingTimestamp)).booleanValue()) {
                    this.oldestPendingTimestamp = candidate;
                }
            });
        }

        public <T> T atomicRead(Supplier<T> operation) {
            this.lock.readLock().lock();
            try {
                T t = operation.get();
                return t;
            }
            finally {
                this.lock.readLock().unlock();
            }
        }

        public void atomicWrite(Runnable operation) {
            this.lock.writeLock().lock();
            try {
                operation.run();
            }
            finally {
                this.lock.writeLock().unlock();
            }
        }
    }
}

