/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.cassandra;

import com.datastax.driver.core.AuthProvider;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PlainTextAuthProvider;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_sdks_java_io_cassandra.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_sdks_java_io_cassandra.com.google.common.collect.Iterators;
import org.apache.beam.repackaged.beam_sdks_java_io_cassandra.com.google.common.util.concurrent.ListenableFuture;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.cassandra.CassandraIO;
import org.apache.beam.sdk.io.cassandra.CassandraService;
import org.apache.beam.sdk.io.cassandra.RingRange;
import org.apache.beam.sdk.io.cassandra.SplitGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraServiceImpl<T>
implements CassandraService<T> {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraServiceImpl.class);
    private static final String MURMUR3PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner";

    public CassandraReaderImpl createReader(CassandraIO.CassandraSource<T> source) {
        return new CassandraReaderImpl(source);
    }

    /*
     * Loose catch block
     */
    @Override
    public long getEstimatedSizeBytes(CassandraIO.Read<T> spec) {
        Throwable throwable = null;
        try (Cluster cluster = this.getCluster(spec.hosts(), spec.port(), spec.username(), spec.password(), spec.localDc(), spec.consistencyLevel());){
            if (CassandraServiceImpl.isMurmur3Partitioner(cluster)) {
                try {
                    List<TokenRange> tokenRanges = CassandraServiceImpl.getTokenRanges(cluster, spec.keyspace(), spec.table());
                    long l = CassandraServiceImpl.getEstimatedSizeBytes(tokenRanges);
                    return l;
                }
                catch (Exception e) {
                    long l;
                    block11: {
                        LOG.warn("Can't estimate the size", (Throwable)e);
                        l = 0L;
                        if (cluster == null) break block11;
                        CassandraServiceImpl.$closeResource(throwable, (AutoCloseable)cluster);
                    }
                    return l;
                }
            }
            LOG.warn("Only Murmur3 partitioner is supported, can't estimate the size");
            long l = 0L;
            return l;
            {
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                catch (Throwable throwable3) {
                    throw throwable3;
                }
            }
        }
    }

    @VisibleForTesting
    static long getEstimatedSizeBytes(List<TokenRange> tokenRanges) {
        long size = 0L;
        for (TokenRange tokenRange : tokenRanges) {
            size += tokenRange.meanPartitionSize * tokenRange.partitionCount;
        }
        return Math.round((double)size / CassandraServiceImpl.getRingFraction(tokenRanges));
    }

    @Override
    public List<BoundedSource<T>> split(CassandraIO.Read<T> spec, long desiredBundleSizeBytes) {
        try (Cluster cluster = this.getCluster(spec.hosts(), spec.port(), spec.username(), spec.password(), spec.localDc(), spec.consistencyLevel());){
            if (CassandraServiceImpl.isMurmur3Partitioner(cluster)) {
                LOG.info("Murmur3Partitioner detected, splitting");
                List<BoundedSource<T>> list = this.split(spec, desiredBundleSizeBytes, this.getEstimatedSizeBytes(spec), cluster);
                return list;
            }
            LOG.warn("Only Murmur3Partitioner is supported for splitting, using an unique source for the read");
            String splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString();
            List<BoundedSource<T>> list = Collections.singletonList(new CassandraIO.CassandraSource<T>(spec, Collections.singletonList(splitQuery)));
            return list;
        }
    }

    private List<BoundedSource<T>> split(CassandraIO.Read<T> spec, long desiredBundleSizeBytes, long estimatedSizeBytes, Cluster cluster) {
        long numSplits = CassandraServiceImpl.getNumSplits(desiredBundleSizeBytes, estimatedSizeBytes, spec.minNumberOfSplits());
        LOG.info("Number of desired splits is {}", (Object)numSplits);
        SplitGenerator splitGenerator = new SplitGenerator(cluster.getMetadata().getPartitioner());
        List<BigInteger> tokens = cluster.getMetadata().getTokenRanges().stream().map(tokenRange -> new BigInteger(tokenRange.getEnd().getValue().toString())).collect(Collectors.toList());
        List<List<RingRange>> splits = splitGenerator.generateSplits(numSplits, tokens);
        LOG.info("{} splits were actually generated", (Object)splits.size());
        String partitionKey = cluster.getMetadata().getKeyspace(spec.keyspace()).getTable(spec.table()).getPartitionKey().stream().map(ColumnMetadata::getName).collect(Collectors.joining(","));
        ArrayList<BoundedSource<T>> sources = new ArrayList<BoundedSource<T>>();
        for (List<RingRange> split : splits) {
            ArrayList<String> queries = new ArrayList<String>();
            for (RingRange range : split) {
                String query;
                Select.Where builder = QueryBuilder.select().from(spec.keyspace(), spec.table()).where();
                if (range.isWrapping()) {
                    builder = builder.and(QueryBuilder.gte((String)("token(" + partitionKey + ")"), (Object)range.getStart()));
                    query = builder.toString();
                    LOG.debug("Cassandra generated read query : {}", (Object)query);
                    queries.add(query);
                    builder = QueryBuilder.select().from(spec.keyspace(), spec.table()).where();
                    builder = builder.and(QueryBuilder.lt((String)("token(" + partitionKey + ")"), (Object)range.getEnd()));
                    query = builder.toString();
                    LOG.debug("Cassandra generated read query : {}", (Object)query);
                    queries.add(query);
                    continue;
                }
                builder = builder.and(QueryBuilder.gte((String)("token(" + partitionKey + ")"), (Object)range.getStart()));
                builder = builder.and(QueryBuilder.lt((String)("token(" + partitionKey + ")"), (Object)range.getEnd()));
                query = builder.toString();
                LOG.debug("Cassandra generated read query : {}", (Object)query);
                queries.add(query);
            }
            sources.add(new CassandraIO.CassandraSource<T>(spec, queries));
        }
        return sources;
    }

    private static long getNumSplits(long desiredBundleSizeBytes, long estimatedSizeBytes, @Nullable Integer minNumberOfSplits) {
        long numSplits;
        long l = numSplits = desiredBundleSizeBytes > 0L ? estimatedSizeBytes / desiredBundleSizeBytes : 1L;
        if (numSplits <= 0L) {
            LOG.warn("Number of splits is less than 0 ({}), fallback to 1", (Object)numSplits);
            numSplits = 1L;
        }
        return minNumberOfSplits != null ? Math.max(numSplits, (long)minNumberOfSplits.intValue()) : numSplits;
    }

    private Cluster getCluster(List<String> hosts, int port, String username, String password, String localDc, String consistencyLevel) {
        Cluster.Builder builder = Cluster.builder().addContactPoints(hosts.toArray(new String[0])).withPort(port);
        if (username != null) {
            builder.withAuthProvider((AuthProvider)new PlainTextAuthProvider(username, password));
        }
        DCAwareRoundRobinPolicy.Builder dcAwarePolicyBuilder = new DCAwareRoundRobinPolicy.Builder();
        if (localDc != null) {
            dcAwarePolicyBuilder.withLocalDc(localDc);
        }
        builder.withLoadBalancingPolicy((LoadBalancingPolicy)new TokenAwarePolicy((LoadBalancingPolicy)dcAwarePolicyBuilder.build()));
        if (consistencyLevel != null) {
            builder.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf((String)consistencyLevel)));
        }
        return builder.build();
    }

    private static List<TokenRange> getTokenRanges(Cluster cluster, String keyspace, String table) {
        try (Session session = cluster.newSession();){
            ResultSet resultSet = session.execute("SELECT range_start, range_end, partitions_count, mean_partition_size FROM system.size_estimates WHERE keyspace_name = ? AND table_name = ?", new Object[]{keyspace, table});
            ArrayList<TokenRange> tokenRanges = new ArrayList<TokenRange>();
            for (Row row : resultSet) {
                TokenRange tokenRange = new TokenRange(row.getLong("partitions_count"), row.getLong("mean_partition_size"), new BigInteger(row.getString("range_start")), new BigInteger(row.getString("range_end")));
                tokenRanges.add(tokenRange);
            }
            ArrayList<TokenRange> arrayList = tokenRanges;
            return arrayList;
        }
    }

    @VisibleForTesting
    static double getRingFraction(List<TokenRange> tokenRanges) {
        double ringFraction = 0.0;
        for (TokenRange tokenRange : tokenRanges) {
            ringFraction += CassandraServiceImpl.distance(tokenRange.rangeStart, tokenRange.rangeEnd).doubleValue() / SplitGenerator.getRangeSize(MURMUR3PARTITIONER).doubleValue();
        }
        return ringFraction;
    }

    @VisibleForTesting
    static BigInteger distance(BigInteger left, BigInteger right) {
        return right.compareTo(left) > 0 ? right.subtract(left) : right.subtract(left).add(SplitGenerator.getRangeSize(MURMUR3PARTITIONER));
    }

    @VisibleForTesting
    static boolean isMurmur3Partitioner(Cluster cluster) {
        return MURMUR3PARTITIONER.equals(cluster.getMetadata().getPartitioner());
    }

    @Override
    public CassandraService.Writer createWriter(CassandraIO.Write<T> spec) {
        return new WriterImpl(spec);
    }

    protected class WriterImpl
    implements CassandraService.Writer<T> {
        private static final int CONCURRENT_ASYNC_QUERIES = 100;
        private final CassandraIO.Write<T> spec;
        private final Cluster cluster;
        private final Session session;
        private final MappingManager mappingManager;
        private List<ListenableFuture<Void>> writeFutures;

        WriterImpl(CassandraIO.Write<T> spec) {
            this.spec = spec;
            this.cluster = CassandraServiceImpl.this.getCluster(spec.hosts(), spec.port(), spec.username(), spec.password(), spec.localDc(), spec.consistencyLevel());
            this.session = this.cluster.connect(spec.keyspace());
            this.mappingManager = new MappingManager(this.session);
            this.writeFutures = new ArrayList<ListenableFuture<Void>>();
        }

        @Override
        public void write(T entity) throws ExecutionException, InterruptedException {
            Mapper mapper = this.mappingManager.mapper(entity.getClass());
            this.writeFutures.add(mapper.saveAsync(entity));
            if (this.writeFutures.size() == 100) {
                LOG.debug("Waiting for a batch of {} Cassandra writes to be executed...", (Object)100);
                this.waitForFuturesToFinish();
                this.writeFutures = new ArrayList<ListenableFuture<Void>>();
            }
        }

        @Override
        public void close() throws ExecutionException, InterruptedException {
            if (this.writeFutures.size() > 0) {
                this.waitForFuturesToFinish();
            }
            if (this.session != null) {
                this.session.close();
            }
            if (this.cluster != null) {
                this.cluster.close();
            }
        }

        private void waitForFuturesToFinish() throws ExecutionException, InterruptedException {
            for (ListenableFuture<Void> future : this.writeFutures) {
                future.get();
            }
        }
    }

    @VisibleForTesting
    static class TokenRange {
        private final long partitionCount;
        private final long meanPartitionSize;
        private final BigInteger rangeStart;
        private final BigInteger rangeEnd;

        public TokenRange(long partitionCount, long meanPartitionSize, BigInteger rangeStart, BigInteger rangeEnd) {
            this.partitionCount = partitionCount;
            this.meanPartitionSize = meanPartitionSize;
            this.rangeStart = rangeStart;
            this.rangeEnd = rangeEnd;
        }
    }

    private class CassandraReaderImpl
    extends BoundedSource.BoundedReader<T> {
        private final CassandraIO.CassandraSource<T> source;
        private Cluster cluster;
        private Session session;
        private Iterator<T> iterator;
        private T current;

        CassandraReaderImpl(CassandraIO.CassandraSource<T> source) {
            this.source = source;
        }

        public boolean start() {
            LOG.debug("Starting Cassandra reader");
            this.cluster = CassandraServiceImpl.this.getCluster(this.source.spec.hosts(), this.source.spec.port(), this.source.spec.username(), this.source.spec.password(), this.source.spec.localDc(), this.source.spec.consistencyLevel());
            this.session = this.cluster.connect();
            LOG.debug("Queries: " + this.source.splitQueries);
            ArrayList<ResultSetFuture> futures = new ArrayList<ResultSetFuture>();
            for (String query : this.source.splitQueries) {
                futures.add(this.session.executeAsync(query));
            }
            MappingManager mappingManager = new MappingManager(this.session);
            Mapper mapper = mappingManager.mapper(this.source.spec.entity());
            for (ResultSetFuture result : futures) {
                if (this.iterator == null) {
                    this.iterator = mapper.map(result.getUninterruptibly()).iterator();
                    continue;
                }
                this.iterator = Iterators.concat(this.iterator, mapper.map(result.getUninterruptibly()).iterator());
            }
            return this.advance();
        }

        public boolean advance() {
            if (this.iterator.hasNext()) {
                this.current = this.iterator.next();
                return true;
            }
            this.current = null;
            return false;
        }

        public void close() {
            LOG.debug("Closing Cassandra reader");
            if (this.session != null) {
                this.session.close();
            }
            if (this.cluster != null) {
                this.cluster.close();
            }
        }

        public T getCurrent() throws NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current;
        }

        public CassandraIO.CassandraSource<T> getCurrentSource() {
            return this.source;
        }
    }
}

