/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.system;

import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import org.apache.samza.SamzaException;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;

public class SystemStreamPartitionIterator
implements Iterator<IncomingMessageEnvelope> {
    private final SystemConsumer systemConsumer;
    private final Set<SystemStreamPartition> fetchSet;
    private Queue<IncomingMessageEnvelope> peeks;
    private boolean endOfStreamReached = false;

    public SystemStreamPartitionIterator(SystemConsumer systemConsumer, SystemStreamPartition systemStreamPartition) {
        this(systemConsumer, systemStreamPartition, 1000);
    }

    public SystemStreamPartitionIterator(SystemConsumer systemConsumer, SystemStreamPartition systemStreamPartition, int fetchSize) {
        this.systemConsumer = systemConsumer;
        this.fetchSet = new HashSet<SystemStreamPartition>();
        this.fetchSet.add(systemStreamPartition);
        this.peeks = new ArrayDeque<IncomingMessageEnvelope>();
    }

    @Override
    public boolean hasNext() {
        this.refresh();
        return this.peeks.size() > 0;
    }

    @Override
    public IncomingMessageEnvelope next() {
        this.refresh();
        if (this.peeks.size() == 0) {
            throw new NoSuchElementException();
        }
        IncomingMessageEnvelope envelope = this.peeks.poll();
        if (envelope.isEndOfStream()) {
            this.endOfStreamReached = true;
        }
        return envelope;
    }

    @Override
    public void remove() {
    }

    private void refresh() {
        if (this.peeks.size() == 0 && !this.endOfStreamReached) {
            try {
                Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = this.systemConsumer.poll(this.fetchSet, -1L);
                for (List<IncomingMessageEnvelope> systemStreamPartitionEnvelopes : envelopes.values()) {
                    this.peeks.addAll(systemStreamPartitionEnvelopes);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new SamzaException(e);
            }
        }
    }
}

