/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.maintenance.operator;

import java.io.IOException;
import java.util.Iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.operator.SingleThreadedIteratorSource;
import org.apache.iceberg.flink.maintenance.operator.TableChange;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class MonitorSource
extends SingleThreadedIteratorSource<TableChange> {
    private static final Logger LOG = LoggerFactory.getLogger(MonitorSource.class);
    private final TableLoader tableLoader;
    private final RateLimiterStrategy rateLimiterStrategy;
    private final long maxReadBack;

    public MonitorSource(TableLoader tableLoader, RateLimiterStrategy rateLimiterStrategy, long maxReadBack) {
        Preconditions.checkNotNull((Object)tableLoader, (Object)"Table loader should no be null");
        Preconditions.checkNotNull((Object)rateLimiterStrategy, (Object)"Rate limiter strategy should no be null");
        Preconditions.checkArgument((maxReadBack > 0L ? 1 : 0) != 0, (Object)"Need to read at least 1 snapshot to work");
        this.tableLoader = tableLoader;
        this.rateLimiterStrategy = rateLimiterStrategy;
        this.maxReadBack = maxReadBack;
    }

    public Boundedness getBoundedness() {
        return Boundedness.CONTINUOUS_UNBOUNDED;
    }

    public TypeInformation<TableChange> getProducedType() {
        return TypeInformation.of(TableChange.class);
    }

    @Override
    Iterator<TableChange> createIterator() {
        return new TableChangeIterator(this.tableLoader, null, this.maxReadBack);
    }

    @Override
    SimpleVersionedSerializer<Iterator<TableChange>> iteratorSerializer() {
        return new TableChangeIteratorSerializer(this.tableLoader, this.maxReadBack);
    }

    @Override
    public SourceReader<TableChange, SingleThreadedIteratorSource.GlobalSplit<TableChange>> createReader(SourceReaderContext readerContext) throws Exception {
        RateLimiter rateLimiter = this.rateLimiterStrategy.createRateLimiter(1);
        return new RateLimitedSourceReader(super.createReader(readerContext), rateLimiter);
    }

    @VisibleForTesting
    static class TableChangeIterator
    implements Iterator<TableChange> {
        private Long lastSnapshotId;
        private final long maxReadBack;
        private final Table table;

        TableChangeIterator(TableLoader tableLoader, Long lastSnapshotId, long maxReadBack) {
            this.lastSnapshotId = lastSnapshotId;
            this.maxReadBack = maxReadBack;
            tableLoader.open();
            this.table = tableLoader.loadTable();
        }

        @Override
        public boolean hasNext() {
            return true;
        }

        @Override
        public TableChange next() {
            try {
                Long current;
                this.table.refresh();
                Snapshot currentSnapshot = this.table.currentSnapshot();
                Long checking = current = currentSnapshot != null ? Long.valueOf(currentSnapshot.snapshotId()) : null;
                TableChange event = TableChange.empty();
                long readBack = 0L;
                while (checking != null && !checking.equals(this.lastSnapshotId) && ++readBack <= this.maxReadBack) {
                    Snapshot snapshot = this.table.snapshot(checking.longValue());
                    if (snapshot != null) {
                        if (!"replace".equals(snapshot.operation())) {
                            LOG.debug("Reading snapshot {}", (Object)snapshot.snapshotId());
                            event.merge(new TableChange(snapshot, this.table.io()));
                        } else {
                            LOG.debug("Skipping replace snapshot {}", (Object)snapshot.snapshotId());
                        }
                        checking = snapshot.parentId();
                        continue;
                    }
                    checking = null;
                }
                this.lastSnapshotId = current;
                return event;
            }
            catch (Exception e) {
                LOG.warn("Failed to fetch table changes for {}", (Object)this.table, (Object)e);
                return TableChange.empty();
            }
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("lastSnapshotId", (Object)this.lastSnapshotId).add("maxReadBack", this.maxReadBack).add("table", (Object)this.table).toString();
        }
    }

    private static final class TableChangeIteratorSerializer
    implements SimpleVersionedSerializer<Iterator<TableChange>> {
        private static final int CURRENT_VERSION = 1;
        private final TableLoader tableLoader;
        private final long maxReadBack;

        TableChangeIteratorSerializer(TableLoader tableLoader, long maxReadBack) {
            this.tableLoader = tableLoader;
            this.maxReadBack = maxReadBack;
        }

        public int getVersion() {
            return 1;
        }

        public byte[] serialize(Iterator<TableChange> iterator) throws IOException {
            Preconditions.checkArgument((boolean)(iterator instanceof TableChangeIterator), (String)"Use TableChangeIterator iterator. Found incompatible type: %s", iterator.getClass());
            TableChangeIterator tableChangeIterator = (TableChangeIterator)iterator;
            DataOutputSerializer out = new DataOutputSerializer(8);
            long toStore = tableChangeIterator.lastSnapshotId != null ? tableChangeIterator.lastSnapshotId : -1L;
            out.writeLong(toStore);
            return out.getCopyOfBuffer();
        }

        public TableChangeIterator deserialize(int version, byte[] serialized) throws IOException {
            if (version == 1) {
                DataInputDeserializer in = new DataInputDeserializer(serialized);
                long fromStore = in.readLong();
                return new TableChangeIterator(this.tableLoader, fromStore != -1L ? Long.valueOf(fromStore) : null, this.maxReadBack);
            }
            throw new IOException("Unrecognized version or corrupt state: " + version);
        }
    }
}

