/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;

public class RewriteFileIndexSource
implements Source<ManifestEntry, Split, CheckpointState> {
    private static final long serialVersionUID = 1L;
    private final FileStoreTable table;
    @Nullable
    private final Predicate partitionPredicate;

    public RewriteFileIndexSource(FileStoreTable table, @Nullable Predicate partitionPredicate) {
        this.table = table;
        this.partitionPredicate = partitionPredicate;
    }

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    public SplitEnumerator<Split, CheckpointState> createEnumerator(SplitEnumeratorContext<Split> splitEnumeratorContext) throws Exception {
        List<ManifestEntry> manifestEntries = this.table.store().newScan().withPartitionFilter(this.partitionPredicate).plan().files();
        return new ManifestFileSplitEnumerator(splitEnumeratorContext, manifestEntries.stream().map(Split::new).collect(Collectors.toList()));
    }

    public SplitEnumerator<Split, CheckpointState> restoreEnumerator(SplitEnumeratorContext<Split> splitEnumeratorContext, CheckpointState checkpointState) throws Exception {
        return new ManifestFileSplitEnumerator(splitEnumeratorContext, checkpointState.files());
    }

    public SimpleVersionedSerializer<Split> getSplitSerializer() {
        return new SplitSerder();
    }

    public SimpleVersionedSerializer<CheckpointState> getEnumeratorCheckpointSerializer() {
        return new CheckpointSerde();
    }

    public SourceReader<ManifestEntry, Split> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        return new Reader(sourceReaderContext);
    }

    private static class Reader
    implements SourceReader<ManifestEntry, Split> {
        private final SourceReaderContext context;
        private final ArrayDeque<Split> splits;
        private boolean noMore;

        public Reader(SourceReaderContext sourceReaderContext) {
            this.context = sourceReaderContext;
            this.splits = new ArrayDeque();
        }

        public void start() {
            this.context.sendSplitRequest();
        }

        public InputStatus pollNext(ReaderOutput<ManifestEntry> readerOutput) throws Exception {
            if (!this.splits.isEmpty()) {
                readerOutput.collect((Object)this.splits.poll().entry());
                if (!this.noMore && this.splits.isEmpty()) {
                    this.context.sendSplitRequest();
                }
                if (!this.splits.isEmpty()) {
                    return InputStatus.MORE_AVAILABLE;
                }
            }
            return this.noMore ? InputStatus.END_OF_INPUT : InputStatus.NOTHING_AVAILABLE;
        }

        public List<Split> snapshotState(long l) {
            return new ArrayList<Split>(this.splits);
        }

        public CompletableFuture<Void> isAvailable() {
            return this.splits.isEmpty() ? CompletableFuture.completedFuture(null) : FutureCompletingBlockingQueue.AVAILABLE;
        }

        public void addSplits(List<Split> list) {
            this.splits.addAll(list);
        }

        public void notifyNoMoreSplits() {
            this.noMore = true;
        }

        public void close() throws Exception {
        }
    }

    private static class CheckpointSerde
    implements SimpleVersionedSerializer<CheckpointState> {
        private final SplitSerder splitSerder = new SplitSerder();

        private CheckpointSerde() {
        }

        public int getVersion() {
            return 0;
        }

        public byte[] serialize(CheckpointState checkpointState) throws IOException {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            DataOutputStream dataOutput = new DataOutputStream(byteArrayOutputStream);
            List<Split> files = checkpointState.files();
            dataOutput.writeInt(files.size());
            for (Split file : files) {
                byte[] b = this.splitSerder.serialize(file);
                dataOutput.writeInt(b.length);
                dataOutput.write(b);
            }
            return new byte[0];
        }

        public CheckpointState deserialize(int i, byte[] bytes) throws IOException {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
            DataInputStream dataInput = new DataInputStream(byteArrayInputStream);
            int size = dataInput.readInt();
            ArrayList<Split> files = new ArrayList<Split>();
            for (int j = 0; j < size; ++j) {
                byte[] b = new byte[dataInput.readInt()];
                dataInput.readFully(b);
                files.add(this.splitSerder.deserialize(0, b));
            }
            return new CheckpointState(files);
        }
    }

    private static class SplitSerder
    implements SimpleVersionedSerializer<Split> {
        private static final ManifestEntrySerializer manifestEntrySerializer = new ManifestEntrySerializer();

        public int getVersion() {
            return 0;
        }

        public byte[] serialize(Split sourceSplit) throws IOException {
            return manifestEntrySerializer.serializeToBytes(sourceSplit.entry());
        }

        public Split deserialize(int i, byte[] bytes) throws IOException {
            return new Split((ManifestEntry)manifestEntrySerializer.deserializeFromBytes(bytes));
        }
    }

    public static class Split
    implements SourceSplit {
        private final ManifestEntry manifestEntry;

        public Split(ManifestEntry manifestEntry) {
            this.manifestEntry = manifestEntry;
        }

        public String splitId() {
            return "splitId";
        }

        ManifestEntry entry() {
            return this.manifestEntry;
        }
    }

    private static class ManifestFileSplitEnumerator
    implements SplitEnumerator<Split, CheckpointState> {
        private final SplitEnumeratorContext<Split> splitEnumeratorContext;
        private final List<Split> files;

        public ManifestFileSplitEnumerator(SplitEnumeratorContext<Split> splitEnumeratorContext, List<Split> files) {
            this.splitEnumeratorContext = splitEnumeratorContext;
            this.files = files;
        }

        public void start() {
        }

        public void handleSplitRequest(int i, @Nullable String s) {
            if (!this.files.isEmpty()) {
                this.splitEnumeratorContext.assignSplit((SourceSplit)this.files.remove(0), i);
            } else {
                this.splitEnumeratorContext.signalNoMoreSplits(i);
            }
        }

        public void addSplitsBack(List<Split> list, int i) {
            this.files.addAll(list);
        }

        public void addReader(int i) {
        }

        public CheckpointState snapshotState(long l) throws Exception {
            return new CheckpointState(this.files);
        }

        public void close() throws IOException {
        }
    }

    public static class CheckpointState {
        private final List<Split> files;

        public CheckpointState(List<Split> files) {
            this.files = files;
        }

        public List<Split> files() {
            return this.files;
        }
    }
}

