/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.connectors.flink;

import io.pravega.client.BatchClientFactory;
import io.pravega.client.ClientConfig;
import io.pravega.client.batch.SegmentIterator;
import io.pravega.client.batch.SegmentRange;
import io.pravega.client.stream.Serializer;
import io.pravega.connectors.flink.AbstractReaderBuilder;
import io.pravega.connectors.flink.PravegaInputSplit;
import io.pravega.connectors.flink.serialization.DeserializerFromSchemaRegistry;
import io.pravega.connectors.flink.serialization.PravegaDeserializationSchema;
import io.pravega.connectors.flink.serialization.WrappingSerializer;
import io.pravega.connectors.flink.util.FlinkPravegaUtils;
import io.pravega.connectors.flink.util.StreamWithBoundaries;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkPravegaInputFormat<T>
extends RichInputFormat<T, PravegaInputSplit> {
    private static final Logger log = LoggerFactory.getLogger(FlinkPravegaInputFormat.class);
    private static final long serialVersionUID = 1L;
    private static final String DEFAULT_CLIENT_SCOPE_NAME = "__NOT_USED";
    private final ClientConfig clientConfig;
    private final String clientScope;
    private final List<StreamWithBoundaries> streams;
    private final DeserializationSchema<T> deserializationSchema;
    private transient BatchClientFactory batchClientFactory;
    private transient SegmentIterator<T> segmentIterator;

    public FlinkPravegaInputFormat(ClientConfig clientConfig, List<StreamWithBoundaries> streams, DeserializationSchema<T> deserializationSchema) {
        this.clientConfig = (ClientConfig)Preconditions.checkNotNull((Object)clientConfig, (String)"clientConfig");
        this.clientScope = DEFAULT_CLIENT_SCOPE_NAME;
        this.streams = (List)Preconditions.checkNotNull(streams, (String)"streams");
        this.deserializationSchema = (DeserializationSchema)Preconditions.checkNotNull(deserializationSchema, (String)"deserializationSchema");
    }

    public void openInputFormat() throws IOException {
        super.openInputFormat();
        this.batchClientFactory = this.getBatchClientFactory(this.clientScope, this.clientConfig);
    }

    @VisibleForTesting
    protected BatchClientFactory getBatchClientFactory(String clientScope, ClientConfig clientConfig) {
        return BatchClientFactory.withScope(clientScope, clientConfig);
    }

    public void closeInputFormat() throws IOException {
        this.batchClientFactory.close();
    }

    public void configure(Configuration parameters) {
    }

    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
        return cachedStatistics;
    }

    public PravegaInputSplit[] createInputSplits(int minNumSplits) throws IOException {
        ArrayList<PravegaInputSplit> splits = new ArrayList<PravegaInputSplit>();
        try (BatchClientFactory batchClientFactory = this.getBatchClientFactory(this.clientScope, this.clientConfig);){
            for (StreamWithBoundaries stream : this.streams) {
                Iterator<SegmentRange> segmentRangeIterator = batchClientFactory.getSegments(stream.getStream(), stream.getFrom(), stream.getTo()).getIterator();
                while (segmentRangeIterator.hasNext()) {
                    splits.add(new PravegaInputSplit(splits.size(), segmentRangeIterator.next()));
                }
            }
        }
        log.info("Prepared {} input splits", (Object)splits.size());
        return splits.toArray(new PravegaInputSplit[splits.size()]);
    }

    public InputSplitAssigner getInputSplitAssigner(PravegaInputSplit[] inputSplits) {
        return new DefaultInputSplitAssigner((InputSplit[])inputSplits);
    }

    public void open(PravegaInputSplit split) throws IOException {
        Serializer<T> deserializer = this.deserializationSchema instanceof WrappingSerializer ? ((WrappingSerializer)this.deserializationSchema).getWrappedSerializer() : new FlinkPravegaUtils.FlinkDeserializer<T>(this.deserializationSchema);
        this.segmentIterator = this.batchClientFactory.readSegment(split.getSegmentRange(), deserializer);
    }

    public boolean reachedEnd() throws IOException {
        return !this.segmentIterator.hasNext();
    }

    public T nextRecord(T t) throws IOException {
        return (T)this.segmentIterator.next();
    }

    public void close() throws IOException {
        this.segmentIterator.close();
    }

    public static <T> Builder<T> builder() {
        return new Builder();
    }

    public static class Builder<T>
    extends AbstractReaderBuilder<Builder<T>> {
        private DeserializationSchema<T> deserializationSchema;

        @Override
        protected Builder<T> builder() {
            return this;
        }

        public Builder<T> withDeserializationSchema(DeserializationSchema<T> deserializationSchema) {
            this.deserializationSchema = deserializationSchema;
            return this.builder();
        }

        public Builder<T> withDeserializationSchemaFromRegistry(String groupId, Class<T> tClass) {
            this.deserializationSchema = new PravegaDeserializationSchema<T>(tClass, new DeserializerFromSchemaRegistry<T>(this.getPravegaConfig(), groupId, tClass));
            return this.builder();
        }

        protected DeserializationSchema<T> getDeserializationSchema() {
            Preconditions.checkState((this.deserializationSchema != null ? 1 : 0) != 0, (Object)"Deserialization schema must not be null.");
            return this.deserializationSchema;
        }

        public FlinkPravegaInputFormat<T> build() {
            return new FlinkPravegaInputFormat<T>(this.getPravegaConfig().getClientConfig(), this.resolveStreams(), this.getDeserializationSchema());
        }
    }
}

