/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;

public class ClickhouseSourceReader
implements SourceReader<SeaTunnelRow, ClickhouseSourceSplit> {
    private final List<ClickHouseNode> servers;
    private ClickHouseClient client;
    private final SeaTunnelRowType rowTypeInfo;
    private final SourceReader.Context readerContext;
    private ClickHouseRequest<?> request;
    private final String sql;
    private final List<ClickhouseSourceSplit> splits;

    ClickhouseSourceReader(List<ClickHouseNode> servers, SourceReader.Context readerContext, SeaTunnelRowType rowTypeInfo, String sql) {
        this.servers = servers;
        this.readerContext = readerContext;
        this.rowTypeInfo = rowTypeInfo;
        this.sql = sql;
        this.splits = new ArrayList<ClickhouseSourceSplit>();
    }

    public void open() {
        Random random = new Random();
        ClickHouseNode server = this.servers.get(random.nextInt(this.servers.size()));
        this.client = ClickHouseClient.newInstance(server.getProtocol());
        this.request = this.client.connect(server).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
    }

    public void close() throws IOException {
        if (this.client != null) {
            this.client.close();
        }
    }

    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        if (!this.splits.isEmpty()) {
            try (ClickHouseResponse response = ((ClickHouseRequest)this.request.query(this.sql)).executeAndWait();){
                response.stream().forEach(record -> {
                    Object[] values = new Object[this.rowTypeInfo.getFieldNames().length];
                    for (int i = 0; i < record.size(); ++i) {
                        values[i] = record.getValue(i).isNullOrEmpty() ? null : TypeConvertUtil.valueUnwrap(this.rowTypeInfo.getFieldType(i), record.getValue(i));
                    }
                    output.collect((Object)new SeaTunnelRow(values));
                });
            }
            this.readerContext.signalNoMoreElement();
            this.splits.clear();
        }
    }

    public List<ClickhouseSourceSplit> snapshotState(long checkpointId) throws Exception {
        return Collections.emptyList();
    }

    public void addSplits(List<ClickhouseSourceSplit> splits) {
        this.splits.addAll(splits);
    }

    public void handleNoMoreSplits() {
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }
}

