/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cassandra.source;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import java.io.IOException;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraSourceReader
extends AbstractSingleSplitReader<SeaTunnelRow> {
    private static final Logger log = LoggerFactory.getLogger(CassandraSourceReader.class);
    private final CassandraConfig cassandraConfig;
    private final SingleSplitReaderContext readerContext;
    private CqlSession session;

    CassandraSourceReader(CassandraConfig cassandraConfig, SingleSplitReaderContext readerContext) {
        this.cassandraConfig = cassandraConfig;
        this.readerContext = readerContext;
    }

    public void open() throws Exception {
        this.session = (CqlSession)CassandraClient.getCqlSessionBuilder(this.cassandraConfig.getHost(), this.cassandraConfig.getKeyspace(), this.cassandraConfig.getUsername(), this.cassandraConfig.getPassword(), this.cassandraConfig.getDatacenter()).build();
    }

    public void close() throws IOException {
        if (this.session != null) {
            this.session.close();
        }
    }

    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        try {
            ResultSet resultSet = this.session.execute(CassandraClient.createSimpleStatement(this.cassandraConfig.getCql(), this.cassandraConfig.getConsistencyLevel()));
            resultSet.forEach(row -> output.collect((Object)TypeConvertUtil.buildSeaTunnelRow(row)));
        }
        finally {
            this.readerContext.signalNoMoreElement();
        }
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }
}

