/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.mapping.annotations.ClusteringColumn;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Computed;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
import info.archinnov.achilles.embedded.CassandraEmbeddedServerBuilder;
import info.archinnov.achilles.embedded.CassandraShutDownHook;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.JMX;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import junit.framework.TestCase;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.cassandra.CassandraIO;
import org.apache.beam.sdk.io.cassandra.Mapper;
import org.apache.beam.sdk.io.cassandra.RingRange;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.cassandra.service.StorageServiceMBean;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class CassandraIOTest
implements Serializable {
    private static final long NUM_ROWS = 22L;
    private static final String CASSANDRA_KEYSPACE = "beam_ks";
    private static final String CASSANDRA_HOST = "127.0.0.1";
    private static final String CASSANDRA_TABLE = "scientist";
    private static final String CASSANDRA_TABLE_SIMPLEDATA = "simpledata";
    private static final Logger LOG = LoggerFactory.getLogger(CassandraIOTest.class);
    private static final String STORAGE_SERVICE_MBEAN = "org.apache.cassandra.db:type=StorageService";
    private static final int FLUSH_TIMEOUT = 30000;
    private static final int JMX_CONF_TIMEOUT = 1000;
    private static int jmxPort;
    private static int cassandraPort;
    private static Cluster cluster;
    private static Session session;
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER;
    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    private static CassandraShutDownHook shutdownHook;
    private static final AtomicInteger counter;
    private static final String CASSANDRA_TABLE_WRITE = "scientist_write";

    @BeforeClass
    public static void beforeClass() throws Exception {
        jmxPort = NetworkTestHelper.getAvailableLocalPort();
        shutdownHook = new CassandraShutDownHook();
        String data = TEMPORARY_FOLDER.newFolder("data").getAbsolutePath();
        String commitLog = TEMPORARY_FOLDER.newFolder("commit-log").getAbsolutePath();
        String cdcRaw = TEMPORARY_FOLDER.newFolder("cdc-raw").getAbsolutePath();
        String hints = TEMPORARY_FOLDER.newFolder("hints").getAbsolutePath();
        String savedCache = TEMPORARY_FOLDER.newFolder("saved-cache").getAbsolutePath();
        Files.createDirectories(Paths.get(savedCache, new String[0]), new FileAttribute[0]);
        CassandraEmbeddedServerBuilder builder = CassandraEmbeddedServerBuilder.builder().withKeyspaceName(CASSANDRA_KEYSPACE).withDataFolder(data).withCommitLogFolder(commitLog).withCdcRawFolder(cdcRaw).withHintsFolder(hints).withSavedCachesFolder(savedCache).withShutdownHook(shutdownHook).withJMXPort(jmxPort).cleanDataFilesAtStartup(false);
        cluster = CassandraIOTest.buildCluster(builder);
        cassandraPort = cluster.getConfiguration().getProtocolOptions().getPort();
        session = cluster.newSession();
        CassandraIOTest.insertData();
        CassandraIOTest.disableAutoCompaction();
    }

    private static Cluster buildCluster(CassandraEmbeddedServerBuilder builder) {
        int tried;
        int delay = 5000;
        NoHostAvailableException exception = null;
        for (tried = 0; tried < 5; ++tried) {
            try {
                return builder.buildNativeCluster();
            }
            catch (NoHostAvailableException e) {
                if (exception == null) {
                    exception = e;
                } else {
                    exception.addSuppressed(e);
                }
                try {
                    Thread.sleep(delay);
                    continue;
                }
                catch (InterruptedException e1) {
                    Thread thread = Thread.currentThread();
                    thread.interrupt();
                    throw new RuntimeException(String.format("Thread %s was interrupted", thread.getName()));
                }
            }
        }
        throw new RuntimeException(String.format("Unable to create embedded Cassandra cluster: tried %d times with %d delay", tried, delay), exception);
    }

    @AfterClass
    public static void afterClass() throws InterruptedException, IOException {
        shutdownHook.shutDownNow();
    }

    private static void insertData() throws Exception {
        LOG.info("Create Cassandra tables");
        session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s(person_department text, person_id int, person_name text, PRIMARY KEY((person_department), person_id));", CASSANDRA_KEYSPACE, CASSANDRA_TABLE));
        session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s(person_department text, person_id int, person_name text, PRIMARY KEY((person_department), person_id));", CASSANDRA_KEYSPACE, CASSANDRA_TABLE_WRITE));
        session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s(id int, data text, PRIMARY KEY (id))", CASSANDRA_KEYSPACE, CASSANDRA_TABLE_SIMPLEDATA));
        LOG.info("Insert records");
        String[][] scientists = new String[][]{{"phys", "Einstein"}, {"bio", "Darwin"}, {"phys", "Copernicus"}, {"bio", "Pasteur"}, {"bio", "Curie"}, {"phys", "Faraday"}, {"math", "Newton"}, {"phys", "Bohr"}, {"phys", "Galileo"}, {"math", "Maxwell"}, {"logic", "Russel"}};
        int i = 0;
        while ((long)i < 22L) {
            int index = i % scientists.length;
            String insertStr = String.format("INSERT INTO %s.%s(person_department, person_id, person_name) values('" + scientists[index][0] + "', " + i + ", '" + scientists[index][1] + "');", CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
            session.execute(insertStr);
            ++i;
        }
        for (i = 0; i < 100; ++i) {
            String insertStr = String.format("INSERT INTO %s.%s(id, data) VALUES(" + i + ",' data_" + i + "');", CASSANDRA_KEYSPACE, CASSANDRA_TABLE_SIMPLEDATA);
            session.execute(insertStr);
        }
        CassandraIOTest.flushMemTablesAndRefreshSizeEstimates();
    }

    private static void flushMemTablesAndRefreshSizeEstimates() throws Exception {
        JMXServiceURL url = new JMXServiceURL(String.format("service:jmx:rmi://%s/jndi/rmi://%s:%s/jmxrmi", CASSANDRA_HOST, CASSANDRA_HOST, jmxPort));
        JMXConnector jmxConnector = JMXConnectorFactory.connect(url, null);
        MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
        ObjectName objectName = new ObjectName(STORAGE_SERVICE_MBEAN);
        StorageServiceMBean mBeanProxy = JMX.newMBeanProxy(mBeanServerConnection, objectName, StorageServiceMBean.class);
        mBeanProxy.forceKeyspaceFlush(CASSANDRA_KEYSPACE, new String[]{CASSANDRA_TABLE});
        mBeanProxy.refreshSizeEstimates();
        jmxConnector.close();
        Thread.sleep(30000L);
    }

    private static void disableAutoCompaction() throws Exception {
        JMXServiceURL url = new JMXServiceURL(String.format("service:jmx:rmi://%s/jndi/rmi://%s:%s/jmxrmi", CASSANDRA_HOST, CASSANDRA_HOST, jmxPort));
        JMXConnector jmxConnector = JMXConnectorFactory.connect(url, null);
        MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
        ObjectName objectName = new ObjectName(STORAGE_SERVICE_MBEAN);
        StorageServiceMBean mBeanProxy = JMX.newMBeanProxy(mBeanServerConnection, objectName, StorageServiceMBean.class);
        mBeanProxy.disableAutoCompaction(CASSANDRA_KEYSPACE, new String[]{CASSANDRA_TABLE});
        jmxConnector.close();
        Thread.sleep(1000L);
    }

    @Test
    public void testWrapAroundRingRanges() throws Exception {
        PCollection simpledataPCollection = (PCollection)this.pipeline.apply((PTransform)CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE_SIMPLEDATA).withMinNumberOfSplits(Integer.valueOf(50)).withCoder((Coder)SerializableCoder.of(SimpleData.class)).withEntity(SimpleData.class));
        PCollection countPCollection = (PCollection)simpledataPCollection.apply("counting", Count.globally());
        PAssert.that((PCollection)countPCollection).satisfies((SerializableFunction & Serializable)i -> {
            long total = 0L;
            for (Long aLong : i) {
                total += aLong.longValue();
            }
            Assert.assertEquals((long)100L, (long)total);
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testRead() throws Exception {
        PCollection output = (PCollection)this.pipeline.apply((PTransform)CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE).withMinNumberOfSplits(Integer.valueOf(50)).withCoder((Coder)SerializableCoder.of(Scientist.class)).withEntity(Scientist.class));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count", Count.globally()))).isEqualTo((Object)22L);
        PCollection mapped = (PCollection)output.apply((PTransform)MapElements.via((SimpleFunction)new SimpleFunction<Scientist, KV<String, Integer>>(){

            public KV<String, Integer> apply(Scientist scientist) {
                return KV.of((Object)scientist.name, (Object)scientist.id);
            }
        }));
        PAssert.that((PCollection)((PCollection)mapped.apply("Count occurrences per scientist", Count.perKey()))).satisfies((SerializableFunction & Serializable)input -> {
            int count = 0;
            for (KV element : input) {
                ++count;
                Assert.assertEquals((String)((String)element.getKey()), (long)2L, (long)((Long)element.getValue()));
            }
            Assert.assertEquals((long)11L, (long)count);
            return null;
        });
        this.pipeline.run();
    }

    private CassandraIO.Read<Scientist> getReadWithRingRange(RingRange ... rr) {
        return CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withRingRanges(new HashSet<RingRange>(Arrays.asList(rr))).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE).withCoder((Coder)SerializableCoder.of(Scientist.class)).withEntity(Scientist.class);
    }

    private CassandraIO.Read<Scientist> getReadWithQuery(String query) {
        return CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withQuery(query).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE).withCoder((Coder)SerializableCoder.of(Scientist.class)).withEntity(Scientist.class);
    }

    @Test
    public void testReadAllQuery() {
        String physQuery = String.format("SELECT * From %s.%s WHERE person_department='phys' AND person_id=0;", CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
        String mathQuery = String.format("SELECT * From %s.%s WHERE person_department='math' AND person_id=6;", CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
        PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(this.getReadWithQuery(physQuery), (Object[])new CassandraIO.Read[]{this.getReadWithQuery(mathQuery)}))).apply((PTransform)CassandraIO.readAll().withCoder((Coder)SerializableCoder.of(Scientist.class)));
        PCollection mapped = (PCollection)output.apply((PTransform)MapElements.via((SimpleFunction)new SimpleFunction<Scientist, String>(){

            public String apply(Scientist scientist) {
                return scientist.name;
            }
        }));
        PAssert.that((PCollection)mapped).containsInAnyOrder((Object[])new String[]{"Einstein", "Newton"});
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("count", Count.globally()))).isEqualTo((Object)2L);
        this.pipeline.run();
    }

    @Test
    public void testReadAllRingRange() {
        RingRange physRR = CassandraIOTest.fromEncodedKey(cluster.getMetadata(), TypeCodec.varchar().serialize((Object)"phys", ProtocolVersion.V3));
        RingRange mathRR = CassandraIOTest.fromEncodedKey(cluster.getMetadata(), TypeCodec.varchar().serialize((Object)"math", ProtocolVersion.V3));
        RingRange logicRR = CassandraIOTest.fromEncodedKey(cluster.getMetadata(), TypeCodec.varchar().serialize((Object)"logic", ProtocolVersion.V3));
        PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(this.getReadWithRingRange(physRR), (Object[])new CassandraIO.Read[]{this.getReadWithRingRange(mathRR, logicRR)}))).apply((PTransform)CassandraIO.readAll().withCoder((Coder)SerializableCoder.of(Scientist.class)));
        PCollection mapped = (PCollection)output.apply((PTransform)MapElements.via((SimpleFunction)new SimpleFunction<Scientist, KV<String, Integer>>(){

            public KV<String, Integer> apply(Scientist scientist) {
                return KV.of((Object)scientist.department, (Object)scientist.id);
            }
        }));
        PAssert.that((PCollection)((PCollection)mapped.apply("Count occurrences per department", Count.perKey()))).satisfies((SerializableFunction & Serializable)input -> {
            HashMap<String, Long> map = new HashMap<String, Long>();
            for (KV element : input) {
                map.put((String)element.getKey(), (Long)element.getValue());
            }
            Assert.assertEquals((long)3L, (long)map.size());
            Assert.assertEquals((long)10L, (long)((Long)map.get("phys")));
            Assert.assertEquals((long)4L, (long)((Long)map.get("math")));
            Assert.assertEquals((long)2L, (long)((Long)map.get("logic")));
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testReadWithQuery() throws Exception {
        String query = String.format("select person_id, writetime(person_name) from %s.%s where person_id=10 AND person_department='logic'", CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
        PCollection output = (PCollection)this.pipeline.apply((PTransform)CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE).withMinNumberOfSplits(Integer.valueOf(20)).withQuery(query).withCoder((Coder)SerializableCoder.of(Scientist.class)).withEntity(Scientist.class));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count", Count.globally()))).isEqualTo((Object)1L);
        PAssert.that((PCollection)output).satisfies((SerializableFunction & Serializable)input -> {
            for (Scientist sci : input) {
                Assert.assertNull((Object)sci.name);
                TestCase.assertTrue((sci.nameTs != null && sci.nameTs > 0L ? 1 : 0) != 0);
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testReadWithQueryProvider() throws Exception {
        String query = String.format("select person_id, writetime(person_name) from %s.%s", CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
        PCollection output = (PCollection)this.pipeline.apply((PTransform)CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE).withMinNumberOfSplits(Integer.valueOf(20)).withQuery((ValueProvider)new MockQueryProvider(query)).withCoder((Coder)SerializableCoder.of(Scientist.class)).withEntity(Scientist.class));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count", Count.globally()))).isEqualTo((Object)22L);
        PAssert.that((PCollection)output).satisfies((SerializableFunction & Serializable)input -> {
            for (Scientist sci : input) {
                Assert.assertNull((Object)sci.name);
                TestCase.assertTrue((sci.nameTs != null && sci.nameTs > 0L ? 1 : 0) != 0);
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testReadWithQueryProviderWithWhereQuery() throws Exception {
        String query = String.format("select person_id, writetime(person_name) from %s.%s where person_id=10 AND person_department='logic'", CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
        PCollection output = (PCollection)this.pipeline.apply((PTransform)CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE).withMinNumberOfSplits(Integer.valueOf(20)).withQuery((ValueProvider)new MockQueryProvider(query)).withCoder((Coder)SerializableCoder.of(Scientist.class)).withEntity(Scientist.class));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count", Count.globally()))).isEqualTo((Object)1L);
        PAssert.that((PCollection)output).satisfies((SerializableFunction & Serializable)input -> {
            for (Scientist sci : input) {
                Assert.assertNull((Object)sci.name);
                TestCase.assertTrue((sci.nameTs != null && sci.nameTs > 0L ? 1 : 0) != 0);
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testReadWithUnfilteredQuery() throws Exception {
        String query = String.format("select person_id, writetime(person_name) from %s.%s", CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
        PCollection output = (PCollection)this.pipeline.apply((PTransform)CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE).withMinNumberOfSplits(Integer.valueOf(20)).withQuery(query).withCoder((Coder)SerializableCoder.of(Scientist.class)).withEntity(Scientist.class));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count", Count.globally()))).isEqualTo((Object)22L);
        PAssert.that((PCollection)output).satisfies((SerializableFunction & Serializable)input -> {
            for (Scientist sci : input) {
                Assert.assertNull((Object)sci.name);
                TestCase.assertTrue((sci.nameTs != null && sci.nameTs > 0L ? 1 : 0) != 0);
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testWrite() {
        ArrayList<ScientistWrite> data = new ArrayList<ScientistWrite>();
        int i = 0;
        while ((long)i < 22L) {
            ScientistWrite scientist = new ScientistWrite();
            scientist.id = i;
            scientist.name = "Name " + i;
            scientist.department = "bio";
            data.add(scientist);
            ++i;
        }
        ((PCollection)this.pipeline.apply((PTransform)Create.of(data))).apply((PTransform)CassandraIO.write().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withEntity(ScientistWrite.class));
        this.pipeline.run();
        List<Row> results = this.getRows(CASSANDRA_TABLE_WRITE);
        Assert.assertEquals((long)22L, (long)results.size());
        for (Row row : results) {
            TestCase.assertTrue((boolean)row.getString("person_name").matches("Name (\\d*)"));
        }
    }

    @Test
    public void testReadWithMapper() throws Exception {
        counter.set(0);
        NOOPMapperFactory factory = new NOOPMapperFactory();
        this.pipeline.apply((PTransform)CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE).withCoder((Coder)SerializableCoder.of(String.class)).withEntity(String.class).withMapperFactoryFn((SerializableFunction)factory));
        this.pipeline.run();
        Assert.assertEquals((long)22L, (long)counter.intValue());
    }

    @Test
    public void testCustomMapperImplWrite() throws Exception {
        counter.set(0);
        NOOPMapperFactory factory = new NOOPMapperFactory();
        ((PCollection)this.pipeline.apply((PTransform)Create.of((Object)"", (Object[])new String[0]))).apply((PTransform)CassandraIO.write().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withMapperFactoryFn((SerializableFunction)factory).withEntity(String.class));
        this.pipeline.run();
        Assert.assertEquals((long)1L, (long)counter.intValue());
    }

    @Test
    public void testCustomMapperImplDelete() {
        counter.set(0);
        NOOPMapperFactory factory = new NOOPMapperFactory();
        ((PCollection)this.pipeline.apply((PTransform)Create.of((Object)"", (Object[])new String[0]))).apply((PTransform)CassandraIO.delete().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withMapperFactoryFn((SerializableFunction)factory).withEntity(String.class));
        this.pipeline.run();
        Assert.assertEquals((long)1L, (long)counter.intValue());
    }

    private List<Row> getRows(String table) {
        ResultSet result = session.execute(String.format("select person_id,person_name from %s.%s", CASSANDRA_KEYSPACE, table));
        return result.all();
    }

    @Test
    public void testDelete() throws Exception {
        List<Row> results = this.getRows(CASSANDRA_TABLE);
        Assert.assertEquals((long)22L, (long)results.size());
        Scientist einstein = new Scientist();
        einstein.id = 0;
        einstein.department = "phys";
        einstein.name = "Einstein";
        ((PCollection)this.pipeline.apply((PTransform)Create.of((Object)einstein, (Object[])new Scientist[0]))).apply((PTransform)CassandraIO.delete().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withEntity(Scientist.class));
        this.pipeline.run();
        results = this.getRows(CASSANDRA_TABLE);
        Assert.assertEquals((long)21L, (long)results.size());
        session.execute(String.format("INSERT INTO %s.%s(person_department, person_id, person_name) values('phys', " + einstein.id + ", '" + einstein.name + "');", CASSANDRA_KEYSPACE, CASSANDRA_TABLE));
    }

    private static RingRange fromEncodedKey(Metadata metadata, ByteBuffer ... bb) {
        BigInteger bi = BigInteger.valueOf((Long)metadata.newToken(bb).getValue());
        return RingRange.of((BigInteger)bi, (BigInteger)bi.add(BigInteger.valueOf(1L)));
    }

    static {
        TEMPORARY_FOLDER = new TemporaryFolder();
        counter = new AtomicInteger();
    }

    @Table(name="scientist_write", keyspace="beam_ks")
    static class ScientistWrite
    extends Scientist {
        ScientistWrite() {
        }
    }

    @Table(name="simpledata", keyspace="beam_ks")
    static class SimpleData
    implements Serializable {
        @PartitionKey
        int id;
        @Column
        String data;

        SimpleData() {
        }

        public String toString() {
            return this.id + ", " + this.data;
        }
    }

    @Table(name="scientist", keyspace="beam_ks")
    static class Scientist
    implements Serializable {
        @Column(name="person_name")
        String name;
        @Computed(value="writetime(person_name)")
        Long nameTs;
        @ClusteringColumn
        @Column(name="person_id")
        int id;
        @PartitionKey
        @Column(name="person_department")
        String department;

        Scientist() {
        }

        public String toString() {
            return this.id + ":" + this.name;
        }

        public boolean equals(@Nullable Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Scientist scientist = (Scientist)o;
            return this.id == scientist.id && Objects.equal((Object)this.name, (Object)scientist.name) && Objects.equal((Object)this.department, (Object)scientist.department);
        }

        public int hashCode() {
            return Objects.hashCode((Object[])new Object[]{this.name, this.id});
        }
    }

    private static class NOOPMapper
    implements Mapper<String>,
    Serializable {
        private final ListeningExecutorService executor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(10));
        final Callable<Void> asyncTask = () -> null;

        private NOOPMapper() {
        }

        public Iterator map(ResultSet resultSet) {
            if (!resultSet.isExhausted()) {
                resultSet.iterator().forEachRemaining(r -> counter.getAndIncrement());
            }
            return Collections.emptyIterator();
        }

        public Future<Void> deleteAsync(String entity) {
            counter.incrementAndGet();
            return this.executor.submit(this.asyncTask);
        }

        public Future<Void> saveAsync(String entity) {
            counter.incrementAndGet();
            return this.executor.submit(this.asyncTask);
        }
    }

    private static class NOOPMapperFactory
    implements SerializableFunction<Session, Mapper> {
        private NOOPMapperFactory() {
        }

        public Mapper apply(Session input) {
            return new NOOPMapper();
        }
    }

    static class MockQueryProvider
    implements ValueProvider<String> {
        private volatile String query;

        MockQueryProvider(String query) {
            this.query = query;
        }

        public String get() {
            return this.query;
        }

        public boolean isAccessible() {
            return !this.query.isEmpty();
        }
    }
}

