/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClickhouseSourceReader
extends AbstractSingleSplitReader<SeaTunnelRow> {
    private static final Logger log = LoggerFactory.getLogger(ClickhouseSourceReader.class);
    private final List<ClickHouseNode> servers;
    private ClickHouseClient client;
    private final SeaTunnelRowType rowTypeInfo;
    private final SingleSplitReaderContext readerContext;
    private ClickHouseRequest<?> request;
    private final String sql;

    ClickhouseSourceReader(List<ClickHouseNode> servers, SingleSplitReaderContext readerContext, String sql, SeaTunnelRowType rowTypeInfo) {
        this.servers = servers;
        this.readerContext = readerContext;
        this.sql = sql;
        this.rowTypeInfo = rowTypeInfo;
    }

    public void open() {
        Random random = new Random();
        ClickHouseNode server = this.servers.get(random.nextInt(this.servers.size()));
        this.client = ClickHouseClient.newInstance(server.getProtocol());
        this.request = this.client.connect(server).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
    }

    public void close() throws IOException {
        if (this.client != null) {
            this.client.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        Object object = output.getCheckpointLock();
        synchronized (object) {
            try (ClickHouseResponse response = ((ClickHouseRequest)this.request.query(this.sql)).executeAndWait();){
                response.stream().forEach(record -> {
                    Object[] values = new Object[this.rowTypeInfo.getFieldNames().length];
                    for (int i = 0; i < record.size(); ++i) {
                        values[i] = record.getValue(i).isNullOrEmpty() ? null : TypeConvertUtil.valueUnwrap(this.rowTypeInfo.getFieldType(i), record.getValue(i));
                    }
                    output.collect((Object)new SeaTunnelRow(values));
                });
            }
            this.signalNoMoreElement();
        }
    }

    private void signalNoMoreElement() {
        log.info("Closed the bounded ClickHouse source");
        this.readerContext.signalNoMoreElement();
    }
}

