/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.system;

import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import org.apache.samza.SamzaException;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;

public class ChangelogSSPIterator {
    private final SystemConsumer systemConsumer;
    private final String endOffset;
    private final SystemAdmin admin;
    private final Set<SystemStreamPartition> fetchSet;
    private final boolean trimEnabled;
    private Queue<IncomingMessageEnvelope> peeks;
    private Mode mode = Mode.RESTORE;

    public ChangelogSSPIterator(SystemConsumer systemConsumer, SystemStreamPartition systemStreamPartition, String endOffset, SystemAdmin admin, boolean trimEnabled) {
        this.systemConsumer = systemConsumer;
        this.endOffset = endOffset;
        this.trimEnabled = trimEnabled;
        if (this.trimEnabled && endOffset == null) {
            this.mode = Mode.TRIM;
        }
        this.admin = admin;
        this.fetchSet = new HashSet<SystemStreamPartition>();
        this.fetchSet.add(systemStreamPartition);
        this.peeks = new ArrayDeque<IncomingMessageEnvelope>();
    }

    public boolean hasNext() {
        this.refresh();
        return this.peeks.size() > 0;
    }

    public IncomingMessageEnvelope next() {
        this.refresh();
        if (this.peeks.size() == 0) {
            throw new NoSuchElementException();
        }
        IncomingMessageEnvelope envelope = this.peeks.poll();
        if (this.trimEnabled && (this.endOffset == null || this.admin.offsetComparator(envelope.getOffset(), this.endOffset) > 0)) {
            this.mode = Mode.TRIM;
        }
        return envelope;
    }

    public Mode getMode() {
        return this.mode;
    }

    private void refresh() {
        if (this.peeks.size() == 0) {
            try {
                Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = this.systemConsumer.poll(this.fetchSet, -1L);
                for (List<IncomingMessageEnvelope> systemStreamPartitionEnvelopes : envelopes.values()) {
                    this.peeks.addAll(systemStreamPartitionEnvelopes);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new SamzaException(e);
            }
        }
    }

    public static enum Mode {
        RESTORE,
        TRIM;

    }
}

