/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.system;

import com.google.common.collect.ImmutableSet;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import org.apache.samza.SamzaException;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;

public class BoundedSSPIterator
implements Iterator<IncomingMessageEnvelope> {
    protected final SystemAdmin admin;
    private final SystemConsumer systemConsumer;
    private final String endOffset;
    private final Set<SystemStreamPartition> fetchSet;
    private Queue<IncomingMessageEnvelope> peeks;

    public BoundedSSPIterator(SystemConsumer systemConsumer, SystemStreamPartition systemStreamPartition, String endOffset, SystemAdmin admin) {
        this.systemConsumer = systemConsumer;
        this.endOffset = endOffset;
        this.admin = admin;
        this.fetchSet = ImmutableSet.of((Object)systemStreamPartition);
        this.peeks = new ArrayDeque<IncomingMessageEnvelope>();
    }

    @Override
    public boolean hasNext() {
        this.refresh();
        return this.peeks.size() > 0 && (this.endOffset == null || this.admin.offsetComparator(this.peeks.peek().getOffset(), this.endOffset) <= 0);
    }

    @Override
    public IncomingMessageEnvelope next() {
        this.refresh();
        if (this.peeks.size() == 0 || this.endOffset != null && this.admin.offsetComparator(this.peeks.peek().getOffset(), this.endOffset) > 0) {
            throw new NoSuchElementException();
        }
        return this.peeks.poll();
    }

    private void refresh() {
        if (this.peeks.size() == 0) {
            try {
                Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = this.systemConsumer.poll(this.fetchSet, -1L);
                for (List<IncomingMessageEnvelope> systemStreamPartitionEnvelopes : envelopes.values()) {
                    this.peeks.addAll(systemStreamPartitionEnvelopes);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new SamzaException(e);
            }
        }
    }
}

