/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.dataflow;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.ListCoder;
import com.google.cloud.dataflow.sdk.coders.NullableCoder;
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.coders.StandardCoder;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Lists;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.StringUtils;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.TimestampedValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataflowUnboundedReadFromBoundedSource<T>
extends PTransform<PInput, PCollection<T>> {
    private static final Logger LOG = LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class);
    private final BoundedSource<T> source;

    public DataflowUnboundedReadFromBoundedSource(BoundedSource<T> source) {
        this.source = source;
    }

    @Override
    public PCollection<T> apply(PInput input) {
        return (PCollection)input.getPipeline().apply(Read.from(new BoundedToUnboundedSourceAdapter<T>(this.source)));
    }

    @Override
    protected Coder<T> getDefaultOutputCoder() {
        return this.source.getDefaultOutputCoder();
    }

    @Override
    public String getKindString() {
        return "Read(" + StringUtils.approximateSimpleName(this.source.getClass()) + ")";
    }

    @Override
    public void populateDisplayData(DisplayData.Builder builder) {
        builder.add(DisplayData.item("source", this.source.getClass())).include(this.source);
    }

    @VisibleForTesting
    public static class BoundedToUnboundedSourceAdapter<T>
    extends UnboundedSource<T, Checkpoint<T>> {
        private BoundedSource<T> boundedSource;

        public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
            this.boundedSource = boundedSource;
        }

        @Override
        public void validate() {
            this.boundedSource.validate();
        }

        @Override
        public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
            try {
                long desiredBundleSize = this.boundedSource.getEstimatedSizeBytes(options) / (long)desiredNumSplits;
                if (desiredBundleSize <= 0L) {
                    LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.", this.boundedSource);
                    return ImmutableList.of(this);
                }
                List<BoundedSource<T>> splits = this.boundedSource.splitIntoBundles(desiredBundleSize, options);
                if (splits == null) {
                    LOG.warn("BoundedSource cannot split {}, skips the initial splits.", this.boundedSource);
                    return ImmutableList.of(this);
                }
                return Lists.transform(splits, new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>(){

                    @Override
                    public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) {
                        return new BoundedToUnboundedSourceAdapter(input);
                    }
                });
            }
            catch (Exception e) {
                LOG.warn("Exception while splitting {}, skips the initial splits.", this.boundedSource, (Object)e);
                return ImmutableList.of(this);
            }
        }

        public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint) {
            if (checkpoint == null) {
                return new Reader(null, this.boundedSource, options);
            }
            return new Reader(((Checkpoint)checkpoint).residualElements, ((Checkpoint)checkpoint).residualSource, options);
        }

        @Override
        public Coder<T> getDefaultOutputCoder() {
            return this.boundedSource.getDefaultOutputCoder();
        }

        @Override
        public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
            return new CheckpointCoder(this.boundedSource.getDefaultOutputCoder());
        }

        private class ResidualSource {
            private BoundedSource<T> residualSource;
            private PipelineOptions options;
            @Nullable
            private BoundedSource.BoundedReader<T> reader;
            private boolean closed;

            public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
                this.residualSource = Preconditions.checkNotNull(residualSource, "residualSource");
                this.options = Preconditions.checkNotNull(options, "options");
                this.reader = null;
                this.closed = false;
            }

            private boolean advance() throws IOException {
                if (this.reader == null && !this.closed) {
                    this.reader = this.residualSource.createReader(this.options);
                    return this.reader.start();
                }
                return this.reader.advance();
            }

            T getCurrent() throws NoSuchElementException {
                if (this.reader == null) {
                    throw new NoSuchElementException();
                }
                return this.reader.getCurrent();
            }

            Instant getCurrentTimestamp() throws NoSuchElementException {
                if (this.reader == null) {
                    throw new NoSuchElementException();
                }
                return this.reader.getCurrentTimestamp();
            }

            void close() throws IOException {
                if (this.reader != null) {
                    this.reader.close();
                    this.reader = null;
                }
                this.closed = true;
            }

            BoundedSource<T> getSource() {
                return this.residualSource;
            }

            Checkpoint<T> getCheckpointMark() {
                if (this.reader == null) {
                    return new Checkpoint(null, this.residualSource);
                }
                BoundedSource residualSplit = null;
                Double fractionConsumed = this.reader.getFractionConsumed();
                if (fractionConsumed != null && 0.0 <= fractionConsumed && fractionConsumed <= 1.0) {
                    double fractionRest = 1.0 - fractionConsumed;
                    int splitAttempts = 8;
                    for (int i = 0; i < 8 && residualSplit == null; ++i) {
                        double fractionToSplit = fractionConsumed + fractionRest * (double)i / (double)splitAttempts;
                        residualSplit = this.reader.splitAtFraction(fractionToSplit);
                    }
                }
                ArrayList newResidualElements = Lists.newArrayList();
                try {
                    while (this.advance()) {
                        newResidualElements.add(TimestampedValue.of(this.reader.getCurrent(), this.reader.getCurrentTimestamp()));
                    }
                }
                catch (IOException e) {
                    throw new RuntimeException("Failed to read elements from the bounded reader.", e);
                }
                return new Checkpoint(newResidualElements, residualSplit);
            }
        }

        private class ResidualElements {
            private final List<TimestampedValue<T>> elementsList;
            @Nullable
            private Iterator<TimestampedValue<T>> elementsIterator;
            @Nullable
            private TimestampedValue<T> currentT;
            private boolean hasCurrent;
            private boolean done;

            ResidualElements(List<TimestampedValue<T>> residualElementsList) {
                this.elementsList = Preconditions.checkNotNull(residualElementsList, "residualElementsList");
                this.elementsIterator = null;
                this.currentT = null;
                this.hasCurrent = false;
                this.done = false;
            }

            public boolean advance() {
                if (this.elementsIterator == null) {
                    this.elementsIterator = this.elementsList.iterator();
                }
                if (this.elementsIterator.hasNext()) {
                    this.currentT = this.elementsIterator.next();
                    this.hasCurrent = true;
                    return true;
                }
                this.done = true;
                this.hasCurrent = false;
                return false;
            }

            boolean hasCurrent() {
                return this.hasCurrent;
            }

            boolean done() {
                return this.done;
            }

            TimestampedValue<T> getCurrentTimestampedValue() {
                if (!this.hasCurrent) {
                    throw new NoSuchElementException();
                }
                return this.currentT;
            }

            T getCurrent() {
                return this.getCurrentTimestampedValue().getValue();
            }

            Instant getCurrentTimestamp() {
                return this.getCurrentTimestampedValue().getTimestamp();
            }

            List<TimestampedValue<T>> getRestElements() {
                if (this.elementsIterator == null) {
                    return this.elementsList;
                }
                ArrayList newResidualElements = Lists.newArrayList();
                while (this.elementsIterator.hasNext()) {
                    newResidualElements.add(this.elementsIterator.next());
                }
                return newResidualElements;
            }
        }

        @VisibleForTesting
        class Reader
        extends UnboundedSource.UnboundedReader<T> {
            private ResidualElements residualElements;
            @Nullable
            private ResidualSource residualSource;
            private final PipelineOptions options;
            private boolean done;

            Reader(@Nullable List<TimestampedValue<T>> residualElementsList, BoundedSource<T> residualSource, PipelineOptions options) {
                this.init(residualElementsList, residualSource, options);
                this.options = Preconditions.checkNotNull(options, "options");
                this.done = false;
            }

            private void init(@Nullable List<TimestampedValue<T>> residualElementsList, @Nullable BoundedSource<T> residualSource, PipelineOptions options) {
                this.residualElements = residualElementsList == null ? new ResidualElements(Collections.emptyList()) : new ResidualElements(residualElementsList);
                this.residualSource = residualSource == null ? null : new ResidualSource(residualSource, options);
            }

            @Override
            public boolean start() throws IOException {
                return this.advance();
            }

            @Override
            public boolean advance() throws IOException {
                if (this.residualElements.advance()) {
                    return true;
                }
                if (this.residualSource != null && this.residualSource.advance()) {
                    return true;
                }
                this.done = true;
                return false;
            }

            @Override
            public void close() throws IOException {
                if (this.residualSource != null) {
                    this.residualSource.close();
                }
            }

            @Override
            public T getCurrent() throws NoSuchElementException {
                if (this.residualElements.hasCurrent()) {
                    return this.residualElements.getCurrent();
                }
                if (this.residualSource != null) {
                    return this.residualSource.getCurrent();
                }
                throw new NoSuchElementException();
            }

            @Override
            public Instant getCurrentTimestamp() throws NoSuchElementException {
                if (this.residualElements.hasCurrent()) {
                    return this.residualElements.getCurrentTimestamp();
                }
                if (this.residualSource != null) {
                    return this.residualSource.getCurrentTimestamp();
                }
                throw new NoSuchElementException();
            }

            @Override
            public Instant getWatermark() {
                return this.done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
            }

            @Override
            public Checkpoint<T> getCheckpointMark() {
                Checkpoint newCheckpoint = !this.residualElements.done() ? new Checkpoint(this.residualElements.getRestElements(), this.residualSource == null ? null : this.residualSource.getSource()) : (this.residualSource != null ? this.residualSource.getCheckpointMark() : new Checkpoint(null, null));
                this.init(newCheckpoint.residualElements, newCheckpoint.residualSource, this.options);
                return newCheckpoint;
            }

            @Override
            public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
                return BoundedToUnboundedSourceAdapter.this;
            }
        }

        @VisibleForTesting
        static class CheckpointCoder<T>
        extends StandardCoder<Checkpoint<T>> {
            private final Coder<List<TimestampedValue<T>>> elemsCoder;
            private final Coder<T> elemCoder;
            private final Coder<BoundedSource> sourceCoder;

            @JsonCreator
            public static CheckpointCoder<?> of(@JsonProperty(value="component_encodings") List<Coder<?>> components) {
                Preconditions.checkArgument(components.size() == 1, "Expecting 1 components, got %s", components.size());
                return new CheckpointCoder(components.get(0));
            }

            CheckpointCoder(Coder<T> elemCoder) {
                this.elemsCoder = NullableCoder.of(ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
                this.elemCoder = elemCoder;
                this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
            }

            @Override
            public void encode(Checkpoint<T> value, OutputStream outStream, Coder.Context context) throws CoderException, IOException {
                Coder.Context nested = context.nested();
                this.elemsCoder.encode(((Checkpoint)value).residualElements, outStream, nested);
                this.sourceCoder.encode(((Checkpoint)value).residualSource, outStream, nested);
            }

            @Override
            public Checkpoint<T> decode(InputStream inStream, Coder.Context context) throws CoderException, IOException {
                Coder.Context nested = context.nested();
                return new Checkpoint<T>(this.elemsCoder.decode(inStream, nested), this.sourceCoder.decode(inStream, nested));
            }

            @Override
            public List<Coder<?>> getCoderArguments() {
                return Arrays.asList(this.elemCoder);
            }

            @Override
            public void verifyDeterministic() throws Coder.NonDeterministicException {
                throw new Coder.NonDeterministicException(this, "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
            }
        }

        @VisibleForTesting
        static class Checkpoint<T>
        implements UnboundedSource.CheckpointMark {
            @Nullable
            private final List<TimestampedValue<T>> residualElements;
            @Nullable
            private final BoundedSource<T> residualSource;

            public Checkpoint(@Nullable List<TimestampedValue<T>> residualElements, @Nullable BoundedSource<T> residualSource) {
                this.residualElements = residualElements;
                this.residualSource = residualSource;
            }

            @Override
            public void finalizeCheckpoint() {
            }

            @Nullable
            @VisibleForTesting
            List<TimestampedValue<T>> getResidualElements() {
                return this.residualElements;
            }

            @Nullable
            @VisibleForTesting
            BoundedSource<T> getResidualSource() {
                return this.residualSource;
            }
        }
    }
}

