/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.SchemaHelpers;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.parquet.Strings;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.catalog.TableProvider;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.joda.time.Instant;

public class DatasetSourceBatch
implements TableProvider {
    private static final StructType BINARY_SCHEMA = SchemaHelpers.binarySchema();

    public StructType inferSchema(CaseInsensitiveStringMap options) {
        return BINARY_SCHEMA;
    }

    public boolean supportsExternalMetadata() {
        return true;
    }

    public Table getTable(StructType schema, Transform[] partitioning, Map<String, String> properties) {
        return new DatasetSourceBatchTable();
    }

    private static class DatasetSourceBatchTable
    implements SupportsRead {
        private DatasetSourceBatchTable() {
        }

        public ScanBuilder newScanBuilder(final CaseInsensitiveStringMap options) {
            return new ScanBuilder(){

                public Scan build() {
                    return new Scan(){

                        public StructType readSchema() {
                            return BINARY_SCHEMA;
                        }

                        public Batch toBatch() {
                            return new BeamBatch(options);
                        }
                    };
                }
            };
        }

        public String name() {
            return "BeamSource";
        }

        public StructType schema() {
            return BINARY_SCHEMA;
        }

        public Set<TableCapability> capabilities() {
            ImmutableSet capabilities = ImmutableSet.of((Object)TableCapability.BATCH_READ);
            return capabilities;
        }

        private static class BeamBatch<T>
        implements Batch,
        Serializable {
            private final int numPartitions;
            private final BoundedSource<T> source;
            private final SerializablePipelineOptions serializablePipelineOptions;

            private BeamBatch(CaseInsensitiveStringMap options) {
                if (Strings.isNullOrEmpty((String)options.get((Object)"beam-source"))) {
                    throw new RuntimeException("Beam source was not set in DataSource options");
                }
                this.source = (BoundedSource)Base64Serializer.deserializeUnchecked((String)options.get((Object)"beam-source"), BoundedSource.class);
                if (Strings.isNullOrEmpty((String)"default-parallelism")) {
                    throw new RuntimeException("Spark default parallelism was not set in DataSource options");
                }
                this.numPartitions = Integer.parseInt(options.get((Object)"default-parallelism"));
                Preconditions.checkArgument((this.numPartitions > 0 ? 1 : 0) != 0, (Object)"Number of partitions must be greater than zero.");
                if (Strings.isNullOrEmpty((String)options.get((Object)"pipeline-options"))) {
                    throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
                }
                this.serializablePipelineOptions = new SerializablePipelineOptions(options.get((Object)"pipeline-options"));
            }

            public InputPartition[] planInputPartitions() {
                PipelineOptions options = this.serializablePipelineOptions.get();
                try {
                    long desiredSizeBytes = this.source.getEstimatedSizeBytes(options) / (long)this.numPartitions;
                    List splits = this.source.split(desiredSizeBytes, options);
                    InputPartition[] result = new InputPartition[splits.size()];
                    int i = 0;
                    for (BoundedSource split : splits) {
                        result[i++] = new BeamInputPartition(split);
                    }
                    return result;
                }
                catch (Exception e) {
                    throw new RuntimeException("Error in splitting BoundedSource " + this.source.getClass().getCanonicalName(), e);
                }
            }

            public PartitionReaderFactory createReaderFactory() {
                return new PartitionReaderFactory(){

                    public PartitionReader<InternalRow> createReader(InputPartition partition) {
                        return new BeamPartitionReader(((BeamInputPartition)partition).getSource(), serializablePipelineOptions);
                    }
                };
            }

            private static class BeamPartitionReader<T>
            implements PartitionReader<InternalRow> {
                private final BoundedSource<T> source;
                private final BoundedSource.BoundedReader<T> reader;
                private boolean started = false;
                private boolean closed = false;

                BeamPartitionReader(BoundedSource<T> source, SerializablePipelineOptions serializablePipelineOptions) {
                    this.source = source;
                    try {
                        this.reader = source.createReader(serializablePipelineOptions.get().as(PipelineOptions.class));
                    }
                    catch (IOException e) {
                        throw new RuntimeException("Error creating BoundedReader ", e);
                    }
                }

                public boolean next() throws IOException {
                    if (!this.started) {
                        this.started = true;
                        return this.reader.start();
                    }
                    return !this.closed && this.reader.advance();
                }

                public InternalRow get() {
                    WindowedValue windowedValue = WindowedValue.timestampedValueInGlobalWindow((Object)this.reader.getCurrent(), (Instant)this.reader.getCurrentTimestamp());
                    return RowHelpers.storeWindowedValueInRow(windowedValue, this.source.getOutputCoder());
                }

                public void close() throws IOException {
                    this.closed = true;
                    this.reader.close();
                }
            }

            private static class BeamInputPartition<T>
            implements InputPartition {
                private final BoundedSource<T> source;

                private BeamInputPartition(BoundedSource<T> source) {
                    this.source = source;
                }

                public BoundedSource<T> getSource() {
                    return this.source;
                }
            }
        }
    }
}

