/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.cassandra;

import com.datastax.driver.core.AuthProvider;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PlainTextAuthProvider;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.google.auto.value.AutoValue;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.cassandra.AutoValue_CassandraIO_Read;
import org.apache.beam.sdk.io.cassandra.AutoValue_CassandraIO_Write;
import org.apache.beam.sdk.io.cassandra.DefaultObjectMapperFactory;
import org.apache.beam.sdk.io.cassandra.Mapper;
import org.apache.beam.sdk.io.cassandra.RingRange;
import org.apache.beam.sdk.io.cassandra.SplitGenerator;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class CassandraIO {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraIO.class);

    private CassandraIO() {
    }

    public static <T> Read<T> read() {
        return new AutoValue_CassandraIO_Read.Builder().build();
    }

    public static <T> Write<T> write() {
        return Write.builder(MutationType.WRITE).build();
    }

    public static <T> Write<T> delete() {
        return Write.builder(MutationType.DELETE).build();
    }

    private static Cluster getCluster(ValueProvider<List<String>> hosts, ValueProvider<Integer> port, ValueProvider<String> username, ValueProvider<String> password, ValueProvider<String> localDc, ValueProvider<String> consistencyLevel, ValueProvider<Integer> connectTimeout, ValueProvider<Integer> readTimeout) {
        Cluster.Builder builder = Cluster.builder().addContactPoints(((List)hosts.get()).toArray(new String[0])).withPort(((Integer)port.get()).intValue());
        if (username != null) {
            builder.withAuthProvider((AuthProvider)new PlainTextAuthProvider((String)username.get(), (String)password.get()));
        }
        DCAwareRoundRobinPolicy.Builder dcAwarePolicyBuilder = new DCAwareRoundRobinPolicy.Builder();
        if (localDc != null) {
            dcAwarePolicyBuilder.withLocalDc((String)localDc.get());
        }
        builder.withLoadBalancingPolicy((LoadBalancingPolicy)new TokenAwarePolicy((LoadBalancingPolicy)dcAwarePolicyBuilder.build()));
        if (consistencyLevel != null) {
            builder.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf((String)((String)consistencyLevel.get()))));
        }
        SocketOptions socketOptions = new SocketOptions();
        builder.withSocketOptions(socketOptions);
        if (connectTimeout != null) {
            socketOptions.setConnectTimeoutMillis(((Integer)connectTimeout.get()).intValue());
        }
        if (readTimeout != null) {
            socketOptions.setReadTimeoutMillis(((Integer)readTimeout.get()).intValue());
        }
        return builder.build();
    }

    private static class Mutator<T> {
        private static final int CONCURRENT_ASYNC_QUERIES = 100;
        private final Cluster cluster;
        private final Session session;
        private final SerializableFunction<Session, Mapper> mapperFactoryFn;
        private List<Future<Void>> mutateFutures;
        private final BiFunction<Mapper<T>, T, Future<Void>> mutator;
        private final String operationName;

        Mutator(Write<T> spec, BiFunction<Mapper<T>, T, Future<Void>> mutator, String operationName) {
            this.cluster = CassandraIO.getCluster((ValueProvider<List<String>>)spec.hosts(), (ValueProvider<Integer>)spec.port(), (ValueProvider<String>)spec.username(), (ValueProvider<String>)spec.password(), (ValueProvider<String>)spec.localDc(), (ValueProvider<String>)spec.consistencyLevel(), (ValueProvider<Integer>)spec.connectTimeout(), (ValueProvider<Integer>)spec.readTimeout());
            this.session = this.cluster.connect((String)spec.keyspace().get());
            this.mapperFactoryFn = spec.mapperFactoryFn();
            this.mutateFutures = new ArrayList<Future<Void>>();
            this.mutator = mutator;
            this.operationName = operationName;
        }

        void mutate(T entity) throws ExecutionException, InterruptedException {
            Mapper mapper = (Mapper)this.mapperFactoryFn.apply((Object)this.session);
            this.mutateFutures.add(this.mutator.apply(mapper, (Mapper)entity));
            if (this.mutateFutures.size() == 100) {
                LOG.debug("Waiting for a batch of {} Cassandra {} to be executed...", (Object)100, (Object)this.operationName);
                this.waitForFuturesToFinish();
                this.mutateFutures = new ArrayList<Future<Void>>();
            }
        }

        void close() throws ExecutionException, InterruptedException {
            if (this.mutateFutures.size() > 0) {
                this.waitForFuturesToFinish();
            }
            if (this.session != null) {
                this.session.close();
            }
            if (this.cluster != null) {
                this.cluster.close();
            }
        }

        private void waitForFuturesToFinish() throws ExecutionException, InterruptedException {
            for (Future<Void> future : this.mutateFutures) {
                future.get();
            }
        }
    }

    private static class DeleteFn<T>
    extends DoFn<T, Void> {
        private final Write<T> spec;
        private transient Mutator<T> deleter;

        DeleteFn(Write<T> spec) {
            this.spec = spec;
        }

        @DoFn.Setup
        public void setup() {
            this.deleter = new Mutator<Object>(this.spec, Mapper::deleteAsync, "deletes");
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws ExecutionException, InterruptedException {
            this.deleter.mutate(c.element());
        }

        @DoFn.Teardown
        public void teardown() throws Exception {
            this.deleter.close();
            this.deleter = null;
        }
    }

    private static class WriteFn<T>
    extends DoFn<T, Void> {
        private final Write<T> spec;
        private transient Mutator<T> writer;

        WriteFn(Write<T> spec) {
            this.spec = spec;
        }

        @DoFn.Setup
        public void setup() {
            this.writer = new Mutator<Object>(this.spec, Mapper::saveAsync, "writes");
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws ExecutionException, InterruptedException {
            this.writer.mutate(c.element());
        }

        @DoFn.Teardown
        public void teardown() throws Exception {
            this.writer.close();
            this.writer = null;
        }
    }

    @AutoValue
    public static abstract class Write<T>
    extends PTransform<PCollection<T>, PDone> {
        @Nullable
        abstract ValueProvider<List<String>> hosts();

        @Nullable
        abstract ValueProvider<Integer> port();

        @Nullable
        abstract ValueProvider<String> keyspace();

        @Nullable
        abstract Class<T> entity();

        @Nullable
        abstract ValueProvider<String> username();

        @Nullable
        abstract ValueProvider<String> password();

        @Nullable
        abstract ValueProvider<String> localDc();

        @Nullable
        abstract ValueProvider<String> consistencyLevel();

        abstract MutationType mutationType();

        @Nullable
        abstract ValueProvider<Integer> connectTimeout();

        @Nullable
        abstract ValueProvider<Integer> readTimeout();

        @Nullable
        abstract SerializableFunction<Session, Mapper> mapperFactoryFn();

        abstract Builder<T> builder();

        static <T> Builder<T> builder(MutationType mutationType) {
            return new AutoValue_CassandraIO_Write.Builder().setMutationType(mutationType);
        }

        public Write<T> withHosts(List<String> hosts) {
            Preconditions.checkArgument((hosts != null ? 1 : 0) != 0, (Object)("CassandraIO." + this.getMutationTypeName() + "().withHosts(hosts) called with null hosts"));
            Preconditions.checkArgument((!hosts.isEmpty() ? 1 : 0) != 0, (Object)("CassandraIO." + this.getMutationTypeName() + "().withHosts(hosts) called with empty hosts list"));
            return this.withHosts((ValueProvider<List<String>>)ValueProvider.StaticValueProvider.of(hosts));
        }

        public Write<T> withHosts(ValueProvider<List<String>> hosts) {
            return this.builder().setHosts(hosts).build();
        }

        public Write<T> withPort(int port) {
            Preconditions.checkArgument((port > 0 ? 1 : 0) != 0, (String)("CassandraIO." + this.getMutationTypeName() + "().withPort(port) called with invalid port number (%s)"), (int)port);
            return this.withPort((ValueProvider<Integer>)ValueProvider.StaticValueProvider.of((Object)port));
        }

        public Write<T> withPort(ValueProvider<Integer> port) {
            return this.builder().setPort(port).build();
        }

        public Write<T> withKeyspace(String keyspace) {
            Preconditions.checkArgument((keyspace != null ? 1 : 0) != 0, (Object)("CassandraIO." + this.getMutationTypeName() + "().withKeyspace(keyspace) called with null keyspace"));
            return this.withKeyspace((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)keyspace));
        }

        public Write<T> withKeyspace(ValueProvider<String> keyspace) {
            return this.builder().setKeyspace(keyspace).build();
        }

        public Write<T> withEntity(Class<T> entity) {
            Preconditions.checkArgument((entity != null ? 1 : 0) != 0, (Object)("CassandraIO." + this.getMutationTypeName() + "().withEntity(entity) called with null entity"));
            return this.builder().setEntity(entity).build();
        }

        public Write<T> withUsername(String username) {
            Preconditions.checkArgument((username != null ? 1 : 0) != 0, (Object)("CassandraIO." + this.getMutationTypeName() + "().withUsername(username) called with null username"));
            return this.withUsername((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)username));
        }

        public Write<T> withUsername(ValueProvider<String> username) {
            return this.builder().setUsername(username).build();
        }

        public Write<T> withPassword(String password) {
            Preconditions.checkArgument((password != null ? 1 : 0) != 0, (Object)("CassandraIO." + this.getMutationTypeName() + "().withPassword(password) called with null password"));
            return this.withPassword((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)password));
        }

        public Write<T> withPassword(ValueProvider<String> password) {
            return this.builder().setPassword(password).build();
        }

        public Write<T> withLocalDc(String localDc) {
            Preconditions.checkArgument((localDc != null ? 1 : 0) != 0, (Object)("CassandraIO." + this.getMutationTypeName() + "().withLocalDc(localDc) called with null localDc"));
            return this.withLocalDc((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)localDc));
        }

        public Write<T> withLocalDc(ValueProvider<String> localDc) {
            return this.builder().setLocalDc(localDc).build();
        }

        public Write<T> withConsistencyLevel(String consistencyLevel) {
            Preconditions.checkArgument((consistencyLevel != null ? 1 : 0) != 0, (Object)("CassandraIO." + this.getMutationTypeName() + "().withConsistencyLevel(consistencyLevel) called with null consistencyLevel"));
            return this.withConsistencyLevel((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)consistencyLevel));
        }

        public Write<T> withConsistencyLevel(ValueProvider<String> consistencyLevel) {
            return this.builder().setConsistencyLevel(consistencyLevel).build();
        }

        public Write<T> withConnectTimeout(Integer timeout) {
            Preconditions.checkArgument((timeout != null && timeout > 0 ? 1 : 0) != 0, (String)("CassandraIO." + this.getMutationTypeName() + "().withConnectTimeout(timeout) called with invalid timeout number (%s)"), (Object)timeout);
            return this.withConnectTimeout((ValueProvider<Integer>)ValueProvider.StaticValueProvider.of((Object)timeout));
        }

        public Write<T> withConnectTimeout(ValueProvider<Integer> timeout) {
            return this.builder().setConnectTimeout(timeout).build();
        }

        public Write<T> withReadTimeout(Integer timeout) {
            Preconditions.checkArgument((timeout != null && timeout > 0 ? 1 : 0) != 0, (String)("CassandraIO." + this.getMutationTypeName() + "().withReadTimeout(timeout) called with invalid timeout number (%s)"), (Object)timeout);
            return this.withReadTimeout((ValueProvider<Integer>)ValueProvider.StaticValueProvider.of((Object)timeout));
        }

        public Write<T> withReadTimeout(ValueProvider<Integer> timeout) {
            return this.builder().setReadTimeout(timeout).build();
        }

        public Write<T> withMapperFactoryFn(SerializableFunction<Session, Mapper> mapperFactoryFn) {
            Preconditions.checkArgument((mapperFactoryFn != null ? 1 : 0) != 0, (Object)("CassandraIO." + this.getMutationTypeName() + "().mapperFactoryFn(mapperFactoryFn) called with null value"));
            return this.builder().setMapperFactoryFn(mapperFactoryFn).build();
        }

        public void validate(PipelineOptions pipelineOptions) {
            Preconditions.checkState((this.hosts() != null ? 1 : 0) != 0, (Object)("CassandraIO." + this.getMutationTypeName() + "() requires a list of hosts to be set via withHosts(hosts)"));
            Preconditions.checkState((this.port() != null ? 1 : 0) != 0, (Object)("CassandraIO." + this.getMutationTypeName() + "() requires a valid port number to be set via withPort(port)"));
            Preconditions.checkState((this.keyspace() != null ? 1 : 0) != 0, (Object)("CassandraIO." + this.getMutationTypeName() + "() requires a keyspace to be set via withKeyspace(keyspace)"));
            Preconditions.checkState((this.entity() != null ? 1 : 0) != 0, (Object)("CassandraIO." + this.getMutationTypeName() + "() requires an entity to be set via withEntity(entity)"));
        }

        public PDone expand(PCollection<T> input) {
            if (this.mutationType() == MutationType.DELETE) {
                input.apply((PTransform)ParDo.of(new DeleteFn(this)));
            } else {
                input.apply((PTransform)ParDo.of(new WriteFn(this)));
            }
            return PDone.in((Pipeline)input.getPipeline());
        }

        private String getMutationTypeName() {
            return this.mutationType() == null ? MutationType.WRITE.name().toLowerCase() : this.mutationType().name().toLowerCase();
        }

        @AutoValue.Builder
        static abstract class Builder<T> {
            Builder() {
            }

            abstract Builder<T> setHosts(ValueProvider<List<String>> var1);

            abstract Builder<T> setPort(ValueProvider<Integer> var1);

            abstract Builder<T> setKeyspace(ValueProvider<String> var1);

            abstract Builder<T> setEntity(Class<T> var1);

            abstract Optional<Class<T>> entity();

            abstract Builder<T> setUsername(ValueProvider<String> var1);

            abstract Builder<T> setPassword(ValueProvider<String> var1);

            abstract Builder<T> setLocalDc(ValueProvider<String> var1);

            abstract Builder<T> setConsistencyLevel(ValueProvider<String> var1);

            abstract Builder<T> setMutationType(MutationType var1);

            abstract Builder<T> setConnectTimeout(ValueProvider<Integer> var1);

            abstract Builder<T> setReadTimeout(ValueProvider<Integer> var1);

            abstract Builder<T> setMapperFactoryFn(SerializableFunction<Session, Mapper> var1);

            abstract Optional<SerializableFunction<Session, Mapper>> mapperFactoryFn();

            abstract Write<T> autoBuild();

            public Write<T> build() {
                if (!this.mapperFactoryFn().isPresent() && this.entity().isPresent()) {
                    this.setMapperFactoryFn(new DefaultObjectMapperFactory<T>(this.entity().get()));
                }
                return this.autoBuild();
            }
        }
    }

    public static enum MutationType {
        WRITE,
        DELETE;

    }

    @VisibleForTesting
    static class CassandraSource<T>
    extends BoundedSource<T> {
        final Read<T> spec;
        final List<String> splitQueries;
        Long estimatedSize;
        private static final String MURMUR3PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner";

        CassandraSource(Read<T> spec, List<String> splitQueries) {
            this(spec, splitQueries, null);
        }

        private CassandraSource(Read<T> spec, List<String> splitQueries, Long estimatedSize) {
            this.estimatedSize = estimatedSize;
            this.spec = spec;
            this.splitQueries = splitQueries;
        }

        public Coder<T> getOutputCoder() {
            return this.spec.coder();
        }

        public BoundedSource.BoundedReader<T> createReader(PipelineOptions pipelineOptions) {
            return new CassandraReader(this);
        }

        public List<BoundedSource<T>> split(long desiredBundleSizeBytes, PipelineOptions pipelineOptions) {
            try (Cluster cluster = CassandraIO.getCluster((ValueProvider<List<String>>)this.spec.hosts(), (ValueProvider<Integer>)this.spec.port(), (ValueProvider<String>)this.spec.username(), (ValueProvider<String>)this.spec.password(), (ValueProvider<String>)this.spec.localDc(), (ValueProvider<String>)this.spec.consistencyLevel(), (ValueProvider<Integer>)this.spec.connectTimeout(), (ValueProvider<Integer>)this.spec.readTimeout());){
                if (CassandraSource.isMurmur3Partitioner(cluster)) {
                    LOG.info("Murmur3Partitioner detected, splitting");
                    List<BoundedSource<T>> list = this.splitWithTokenRanges(this.spec, desiredBundleSizeBytes, this.getEstimatedSizeBytes(pipelineOptions), cluster);
                    return list;
                }
                LOG.warn("Only Murmur3Partitioner is supported for splitting, using a unique source for the read");
                List<BoundedSource<T>> list = Collections.singletonList(new CassandraSource<T>(this.spec, Collections.singletonList(CassandraSource.buildQuery(this.spec))));
                return list;
            }
        }

        private static String buildQuery(Read spec) {
            return spec.query() == null ? String.format("SELECT * FROM %s.%s", spec.keyspace().get(), spec.table().get()) : spec.query().get().toString();
        }

        private List<BoundedSource<T>> splitWithTokenRanges(Read<T> spec, long desiredBundleSizeBytes, long estimatedSizeBytes, Cluster cluster) {
            long numSplits = CassandraSource.getNumSplits(desiredBundleSizeBytes, estimatedSizeBytes, spec.minNumberOfSplits());
            LOG.info("Number of desired splits is {}", (Object)numSplits);
            SplitGenerator splitGenerator = new SplitGenerator(cluster.getMetadata().getPartitioner());
            List<BigInteger> tokens = cluster.getMetadata().getTokenRanges().stream().map(tokenRange -> new BigInteger(tokenRange.getEnd().getValue().toString())).collect(Collectors.toList());
            List<List<RingRange>> splits = splitGenerator.generateSplits(numSplits, tokens);
            LOG.info("{} splits were actually generated", (Object)splits.size());
            String partitionKey = cluster.getMetadata().getKeyspace((String)spec.keyspace().get()).getTable((String)spec.table().get()).getPartitionKey().stream().map(ColumnMetadata::getName).collect(Collectors.joining(","));
            List<TokenRange> tokenRanges = CassandraSource.getTokenRanges(cluster, (String)spec.keyspace().get(), (String)spec.table().get());
            long estimatedSize = CassandraSource.getEstimatedSizeBytesFromTokenRanges(tokenRanges) / (long)splits.size();
            ArrayList<BoundedSource<T>> sources = new ArrayList<BoundedSource<T>>();
            for (List<RingRange> split : splits) {
                ArrayList<String> queries = new ArrayList<String>();
                for (RingRange range : split) {
                    if (range.isWrapping()) {
                        queries.add(CassandraSource.generateRangeQuery(spec, partitionKey, range.getStart(), null));
                        queries.add(CassandraSource.generateRangeQuery(spec, partitionKey, null, range.getEnd()));
                        continue;
                    }
                    queries.add(CassandraSource.generateRangeQuery(spec, partitionKey, range.getStart(), range.getEnd()));
                }
                sources.add(new CassandraSource<T>(spec, queries, estimatedSize));
            }
            return sources;
        }

        private static String generateRangeQuery(Read spec, String partitionKey, BigInteger rangeStart, BigInteger rangeEnd) {
            String rangeFilter = Joiner.on((String)" AND ").skipNulls().join(rangeStart == null ? null : String.format("(token(%s) >= %d)", partitionKey, rangeStart), rangeEnd == null ? null : String.format("(token(%s) < %d)", partitionKey, rangeEnd), new Object[0]);
            String query = spec.query() == null ? CassandraSource.buildQuery(spec) + " WHERE " + rangeFilter : CassandraSource.buildQuery(spec) + " AND " + rangeFilter;
            LOG.debug("CassandraIO generated query : {}", (Object)query);
            return query;
        }

        private static long getNumSplits(long desiredBundleSizeBytes, long estimatedSizeBytes, @Nullable ValueProvider<Integer> minNumberOfSplits) {
            long numSplits;
            long l = numSplits = desiredBundleSizeBytes > 0L ? estimatedSizeBytes / desiredBundleSizeBytes : 1L;
            if (numSplits <= 0L) {
                LOG.warn("Number of splits is less than 0 ({}), fallback to 1", (Object)numSplits);
                numSplits = 1L;
            }
            return minNumberOfSplits != null ? Math.max(numSplits, (long)((Integer)minNumberOfSplits.get()).intValue()) : numSplits;
        }

        /*
         * Loose catch block
         */
        public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
            if (this.estimatedSize != null) {
                return this.estimatedSize;
            }
            Throwable throwable = null;
            try (Cluster cluster = CassandraIO.getCluster((ValueProvider<List<String>>)this.spec.hosts(), (ValueProvider<Integer>)this.spec.port(), (ValueProvider<String>)this.spec.username(), (ValueProvider<String>)this.spec.password(), (ValueProvider<String>)this.spec.localDc(), (ValueProvider<String>)this.spec.consistencyLevel(), (ValueProvider<Integer>)this.spec.connectTimeout(), (ValueProvider<Integer>)this.spec.readTimeout());){
                if (CassandraSource.isMurmur3Partitioner(cluster)) {
                    try {
                        List<TokenRange> tokenRanges = CassandraSource.getTokenRanges(cluster, (String)this.spec.keyspace().get(), (String)this.spec.table().get());
                        this.estimatedSize = CassandraSource.getEstimatedSizeBytesFromTokenRanges(tokenRanges);
                        long l = this.estimatedSize;
                        return l;
                    }
                    catch (Exception e) {
                        long l;
                        block12: {
                            LOG.warn("Can't estimate the size", (Throwable)e);
                            l = 0L;
                            if (cluster == null) break block12;
                            CassandraSource.$closeResource(throwable, (AutoCloseable)cluster);
                        }
                        return l;
                    }
                }
                LOG.warn("Only Murmur3 partitioner is supported, can't estimate the size");
                long l = 0L;
                return l;
                {
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    catch (Throwable throwable3) {
                        throw throwable3;
                    }
                }
            }
        }

        @VisibleForTesting
        static long getEstimatedSizeBytesFromTokenRanges(List<TokenRange> tokenRanges) {
            long size = 0L;
            for (TokenRange tokenRange : tokenRanges) {
                size += tokenRange.meanPartitionSize * tokenRange.partitionCount;
            }
            return Math.round((double)size / CassandraSource.getRingFraction(tokenRanges));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            if (this.spec.hosts() != null) {
                builder.add(DisplayData.item((String)"hosts", (String)this.spec.hosts().toString()));
            }
            if (this.spec.port() != null) {
                builder.add(DisplayData.item((String)"port", this.spec.port()));
            }
            builder.addIfNotNull(DisplayData.item((String)"keyspace", this.spec.keyspace()));
            builder.addIfNotNull(DisplayData.item((String)"table", this.spec.table()));
            builder.addIfNotNull(DisplayData.item((String)"username", this.spec.username()));
            builder.addIfNotNull(DisplayData.item((String)"localDc", this.spec.localDc()));
            builder.addIfNotNull(DisplayData.item((String)"consistencyLevel", this.spec.consistencyLevel()));
        }

        private static List<TokenRange> getTokenRanges(Cluster cluster, String keyspace, String table) {
            try (Session session = cluster.newSession();){
                ResultSet resultSet = session.execute("SELECT range_start, range_end, partitions_count, mean_partition_size FROM system.size_estimates WHERE keyspace_name = ? AND table_name = ?", new Object[]{keyspace, table});
                ArrayList<TokenRange> tokenRanges = new ArrayList<TokenRange>();
                for (Row row : resultSet) {
                    TokenRange tokenRange = new TokenRange(row.getLong("partitions_count"), row.getLong("mean_partition_size"), new BigInteger(row.getString("range_start")), new BigInteger(row.getString("range_end")));
                    tokenRanges.add(tokenRange);
                }
                ArrayList<TokenRange> arrayList = tokenRanges;
                return arrayList;
            }
        }

        @VisibleForTesting
        static double getRingFraction(List<TokenRange> tokenRanges) {
            double ringFraction = 0.0;
            for (TokenRange tokenRange : tokenRanges) {
                ringFraction += CassandraSource.distance(tokenRange.rangeStart, tokenRange.rangeEnd).doubleValue() / SplitGenerator.getRangeSize(MURMUR3PARTITIONER).doubleValue();
            }
            return ringFraction;
        }

        @VisibleForTesting
        static boolean isMurmur3Partitioner(Cluster cluster) {
            return MURMUR3PARTITIONER.equals(cluster.getMetadata().getPartitioner());
        }

        @VisibleForTesting
        static BigInteger distance(BigInteger left, BigInteger right) {
            return right.compareTo(left) > 0 ? right.subtract(left) : right.subtract(left).add(SplitGenerator.getRangeSize(MURMUR3PARTITIONER));
        }

        private class CassandraReader
        extends BoundedSource.BoundedReader<T> {
            private final CassandraSource<T> source;
            private Cluster cluster;
            private Session session;
            private Iterator<T> iterator;
            private T current;

            CassandraReader(CassandraSource<T> source) {
                this.source = source;
            }

            public boolean start() {
                LOG.debug("Starting Cassandra reader");
                this.cluster = CassandraIO.getCluster((ValueProvider<List<String>>)this.source.spec.hosts(), (ValueProvider<Integer>)this.source.spec.port(), (ValueProvider<String>)this.source.spec.username(), (ValueProvider<String>)this.source.spec.password(), (ValueProvider<String>)this.source.spec.localDc(), (ValueProvider<String>)this.source.spec.consistencyLevel(), (ValueProvider<Integer>)this.source.spec.connectTimeout(), (ValueProvider<Integer>)this.source.spec.readTimeout());
                this.session = this.cluster.connect((String)this.source.spec.keyspace().get());
                LOG.debug("Queries: " + this.source.splitQueries);
                ArrayList<ResultSetFuture> futures = new ArrayList<ResultSetFuture>();
                for (String query : this.source.splitQueries) {
                    futures.add(this.session.executeAsync(query));
                }
                Mapper mapper = this.getMapper(this.session, this.source.spec.entity());
                for (ResultSetFuture result : futures) {
                    if (this.iterator == null) {
                        this.iterator = mapper.map(result.getUninterruptibly());
                        continue;
                    }
                    this.iterator = Iterators.concat(this.iterator, mapper.map(result.getUninterruptibly()));
                }
                return this.advance();
            }

            public boolean advance() {
                if (this.iterator.hasNext()) {
                    this.current = this.iterator.next();
                    return true;
                }
                this.current = null;
                return false;
            }

            public void close() {
                LOG.debug("Closing Cassandra reader");
                if (this.session != null) {
                    this.session.close();
                }
                if (this.cluster != null) {
                    this.cluster.close();
                }
            }

            public T getCurrent() throws NoSuchElementException {
                if (this.current == null) {
                    throw new NoSuchElementException();
                }
                return this.current;
            }

            public CassandraSource<T> getCurrentSource() {
                return this.source;
            }

            private Mapper<T> getMapper(Session session, Class<T> enitity) {
                return (Mapper)this.source.spec.mapperFactoryFn().apply((Object)session);
            }
        }

        @VisibleForTesting
        static class TokenRange {
            private final long partitionCount;
            private final long meanPartitionSize;
            private final BigInteger rangeStart;
            private final BigInteger rangeEnd;

            TokenRange(long partitionCount, long meanPartitionSize, BigInteger rangeStart, BigInteger rangeEnd) {
                this.partitionCount = partitionCount;
                this.meanPartitionSize = meanPartitionSize;
                this.rangeStart = rangeStart;
                this.rangeEnd = rangeEnd;
            }
        }
    }

    @AutoValue
    public static abstract class Read<T>
    extends PTransform<PBegin, PCollection<T>> {
        @Nullable
        abstract ValueProvider<List<String>> hosts();

        @Nullable
        abstract ValueProvider<String> query();

        @Nullable
        abstract ValueProvider<Integer> port();

        @Nullable
        abstract ValueProvider<String> keyspace();

        @Nullable
        abstract ValueProvider<String> table();

        @Nullable
        abstract Class<T> entity();

        @Nullable
        abstract Coder<T> coder();

        @Nullable
        abstract ValueProvider<String> username();

        @Nullable
        abstract ValueProvider<String> password();

        @Nullable
        abstract ValueProvider<String> localDc();

        @Nullable
        abstract ValueProvider<String> consistencyLevel();

        @Nullable
        abstract ValueProvider<Integer> minNumberOfSplits();

        @Nullable
        abstract ValueProvider<Integer> connectTimeout();

        @Nullable
        abstract ValueProvider<Integer> readTimeout();

        @Nullable
        abstract SerializableFunction<Session, Mapper> mapperFactoryFn();

        abstract Builder<T> builder();

        public Read<T> withHosts(List<String> hosts) {
            Preconditions.checkArgument((hosts != null ? 1 : 0) != 0, (Object)"hosts can not be null");
            Preconditions.checkArgument((!hosts.isEmpty() ? 1 : 0) != 0, (Object)"hosts can not be empty");
            return this.withHosts((ValueProvider<List<String>>)ValueProvider.StaticValueProvider.of(hosts));
        }

        public Read<T> withHosts(ValueProvider<List<String>> hosts) {
            return this.builder().setHosts(hosts).build();
        }

        public Read<T> withPort(int port) {
            Preconditions.checkArgument((port > 0 ? 1 : 0) != 0, (String)"port must be > 0, but was: %s", (int)port);
            return this.withPort((ValueProvider<Integer>)ValueProvider.StaticValueProvider.of((Object)port));
        }

        public Read<T> withPort(ValueProvider<Integer> port) {
            return this.builder().setPort(port).build();
        }

        public Read<T> withKeyspace(String keyspace) {
            Preconditions.checkArgument((keyspace != null ? 1 : 0) != 0, (Object)"keyspace can not be null");
            return this.withKeyspace((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)keyspace));
        }

        public Read<T> withKeyspace(ValueProvider<String> keyspace) {
            return this.builder().setKeyspace(keyspace).build();
        }

        public Read<T> withTable(String table) {
            Preconditions.checkArgument((table != null ? 1 : 0) != 0, (Object)"table can not be null");
            return this.withTable((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)table));
        }

        public Read<T> withTable(ValueProvider<String> table) {
            return this.builder().setTable(table).build();
        }

        public Read<T> withQuery(String query) {
            Preconditions.checkArgument((query != null && query.length() > 0 ? 1 : 0) != 0, (Object)"query cannot be null");
            return this.withQuery((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)query));
        }

        public Read<T> withQuery(ValueProvider<String> query) {
            return this.builder().setQuery(query).build();
        }

        public Read<T> withEntity(Class<T> entity) {
            Preconditions.checkArgument((entity != null ? 1 : 0) != 0, (Object)"entity can not be null");
            return this.builder().setEntity(entity).build();
        }

        public Read<T> withCoder(Coder<T> coder) {
            Preconditions.checkArgument((coder != null ? 1 : 0) != 0, (Object)"coder can not be null");
            return this.builder().setCoder(coder).build();
        }

        public Read<T> withUsername(String username) {
            Preconditions.checkArgument((username != null ? 1 : 0) != 0, (Object)"username can not be null");
            return this.withUsername((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)username));
        }

        public Read<T> withUsername(ValueProvider<String> username) {
            return this.builder().setUsername(username).build();
        }

        public Read<T> withPassword(String password) {
            Preconditions.checkArgument((password != null ? 1 : 0) != 0, (Object)"password can not be null");
            return this.withPassword((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)password));
        }

        public Read<T> withPassword(ValueProvider<String> password) {
            return this.builder().setPassword(password).build();
        }

        public Read<T> withLocalDc(String localDc) {
            Preconditions.checkArgument((localDc != null ? 1 : 0) != 0, (Object)"localDc can not be null");
            return this.withLocalDc((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)localDc));
        }

        public Read<T> withLocalDc(ValueProvider<String> localDc) {
            return this.builder().setLocalDc(localDc).build();
        }

        public Read<T> withConsistencyLevel(String consistencyLevel) {
            Preconditions.checkArgument((consistencyLevel != null ? 1 : 0) != 0, (Object)"consistencyLevel can not be null");
            return this.withConsistencyLevel((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)consistencyLevel));
        }

        public Read<T> withConsistencyLevel(ValueProvider<String> consistencyLevel) {
            return this.builder().setConsistencyLevel(consistencyLevel).build();
        }

        public Read<T> withMinNumberOfSplits(Integer minNumberOfSplits) {
            Preconditions.checkArgument((minNumberOfSplits != null ? 1 : 0) != 0, (Object)"minNumberOfSplits can not be null");
            Preconditions.checkArgument((minNumberOfSplits > 0 ? 1 : 0) != 0, (Object)"minNumberOfSplits must be greater than 0");
            return this.withMinNumberOfSplits((ValueProvider<Integer>)ValueProvider.StaticValueProvider.of((Object)minNumberOfSplits));
        }

        public Read<T> withMinNumberOfSplits(ValueProvider<Integer> minNumberOfSplits) {
            return this.builder().setMinNumberOfSplits(minNumberOfSplits).build();
        }

        public Read<T> withConnectTimeout(Integer timeout) {
            Preconditions.checkArgument((timeout != null ? 1 : 0) != 0, (Object)"Connect timeout can not be null");
            Preconditions.checkArgument((timeout > 0 ? 1 : 0) != 0, (String)"Connect timeout must be > 0, but was: %s", (Object)timeout);
            return this.withConnectTimeout((ValueProvider<Integer>)ValueProvider.StaticValueProvider.of((Object)timeout));
        }

        public Read<T> withConnectTimeout(ValueProvider<Integer> timeout) {
            return this.builder().setConnectTimeout(timeout).build();
        }

        public Read<T> withReadTimeout(Integer timeout) {
            Preconditions.checkArgument((timeout != null ? 1 : 0) != 0, (Object)"Read timeout can not be null");
            Preconditions.checkArgument((timeout > 0 ? 1 : 0) != 0, (String)"Read timeout must be > 0, but was: %s", (Object)timeout);
            return this.withReadTimeout((ValueProvider<Integer>)ValueProvider.StaticValueProvider.of((Object)timeout));
        }

        public Read<T> withReadTimeout(ValueProvider<Integer> timeout) {
            return this.builder().setReadTimeout(timeout).build();
        }

        public Read<T> withMapperFactoryFn(SerializableFunction<Session, Mapper> mapperFactory) {
            Preconditions.checkArgument((mapperFactory != null ? 1 : 0) != 0, (Object)"CassandraIO.withMapperFactory(withMapperFactory) called with null value");
            return this.builder().setMapperFactoryFn(mapperFactory).build();
        }

        public PCollection<T> expand(PBegin input) {
            Preconditions.checkArgument((this.hosts() != null && this.port() != null ? 1 : 0) != 0, (Object)"WithHosts() and withPort() are required");
            Preconditions.checkArgument((this.keyspace() != null ? 1 : 0) != 0, (Object)"withKeyspace() is required");
            Preconditions.checkArgument((this.table() != null ? 1 : 0) != 0, (Object)"withTable() is required");
            Preconditions.checkArgument((this.entity() != null ? 1 : 0) != 0, (Object)"withEntity() is required");
            Preconditions.checkArgument((this.coder() != null ? 1 : 0) != 0, (Object)"withCoder() is required");
            return (PCollection)input.apply((PTransform)org.apache.beam.sdk.io.Read.from(new CassandraSource(this, null)));
        }

        @AutoValue.Builder
        static abstract class Builder<T> {
            Builder() {
            }

            abstract Builder<T> setHosts(ValueProvider<List<String>> var1);

            abstract Builder<T> setQuery(ValueProvider<String> var1);

            abstract Builder<T> setPort(ValueProvider<Integer> var1);

            abstract Builder<T> setKeyspace(ValueProvider<String> var1);

            abstract Builder<T> setTable(ValueProvider<String> var1);

            abstract Builder<T> setEntity(Class<T> var1);

            abstract Optional<Class<T>> entity();

            abstract Builder<T> setCoder(Coder<T> var1);

            abstract Builder<T> setUsername(ValueProvider<String> var1);

            abstract Builder<T> setPassword(ValueProvider<String> var1);

            abstract Builder<T> setLocalDc(ValueProvider<String> var1);

            abstract Builder<T> setConsistencyLevel(ValueProvider<String> var1);

            abstract Builder<T> setMinNumberOfSplits(ValueProvider<Integer> var1);

            abstract Builder<T> setConnectTimeout(ValueProvider<Integer> var1);

            abstract Builder<T> setReadTimeout(ValueProvider<Integer> var1);

            abstract Builder<T> setMapperFactoryFn(SerializableFunction<Session, Mapper> var1);

            abstract Optional<SerializableFunction<Session, Mapper>> mapperFactoryFn();

            abstract Read<T> autoBuild();

            public Read<T> build() {
                if (!this.mapperFactoryFn().isPresent() && this.entity().isPresent()) {
                    this.setMapperFactoryFn(new DefaultObjectMapperFactory<T>(this.entity().get()));
                }
                return this.autoBuild();
            }
        }
    }
}

