/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.jms;

import java.util.ArrayList;
import java.util.List;
import javax.jms.Message;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

@DefaultCoder(value=AvroCoder.class)
public class JmsCheckpointMark
implements UnboundedSource.CheckpointMark {
    private final List<Message> messages = new ArrayList<Message>();
    private Instant oldestPendingTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;

    protected List<Message> getMessages() {
        return this.messages;
    }

    protected void addMessage(Message message) throws Exception {
        Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
        if (currentMessageTimestamp.isBefore((ReadableInstant)this.oldestPendingTimestamp)) {
            this.oldestPendingTimestamp = currentMessageTimestamp;
        }
        this.messages.add(message);
    }

    protected Instant getOldestPendingTimestamp() {
        return this.oldestPendingTimestamp;
    }

    public void finalizeCheckpoint() {
        for (Message message : this.messages) {
            try {
                message.acknowledge();
                Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
                if (!currentMessageTimestamp.isAfter((ReadableInstant)this.oldestPendingTimestamp)) continue;
                this.oldestPendingTimestamp = currentMessageTimestamp;
            }
            catch (Exception exception) {}
        }
        this.messages.clear();
    }
}

