/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceReader;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigMergeable;

@AutoService(value={SeaTunnelSource.class})
public class ClickhouseSource
implements SeaTunnelSource<SeaTunnelRow, ClickhouseSourceSplit, ClickhouseSourceState>,
SupportParallelism,
SupportColumnProjection {
    private List<ClickHouseNode> servers;
    private SeaTunnelRowType rowTypeInfo;
    private String sql;

    public String getPluginName() {
        return "Clickhouse";
    }

    public void prepare(Config config) throws PrepareFailException {
        CheckResult result = CheckConfigUtil.checkAllExists((Config)config, (String[])new String[]{ClickhouseConfig.HOST.key(), ClickhouseConfig.DATABASE.key(), ClickhouseConfig.SQL.key(), ClickhouseConfig.USERNAME.key(), ClickhouseConfig.PASSWORD.key()});
        if (!result.isSuccess()) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", this.getPluginName(), PluginType.SOURCE, result.getMsg()));
        }
        ImmutableMap defaultConfig = ImmutableMap.builder().put((Object)ClickhouseConfig.SERVER_TIME_ZONE.key(), ClickhouseConfig.SERVER_TIME_ZONE.defaultValue()).build();
        config = config.withFallback((ConfigMergeable)ConfigFactory.parseMap((Map)defaultConfig));
        this.servers = ClickhouseUtil.createNodes(config.getString(ClickhouseConfig.HOST.key()), config.getString(ClickhouseConfig.DATABASE.key()), config.getString(ClickhouseConfig.SERVER_TIME_ZONE.key()), config.getString(ClickhouseConfig.USERNAME.key()), config.getString(ClickhouseConfig.PASSWORD.key()));
        this.sql = config.getString(ClickhouseConfig.SQL.key());
        ClickHouseNode currentServer = this.servers.get(ThreadLocalRandom.current().nextInt(this.servers.size()));
        try (ClickHouseClient client = ClickHouseClient.newInstance(currentServer.getProtocol());
             ClickHouseResponse response = ((ClickHouseRequest)((ClickHouseRequest)client.connect(currentServer).format(ClickHouseFormat.RowBinaryWithNamesAndTypes)).query(this.modifySQLToLimit1(config.getString(ClickhouseConfig.SQL.key())))).executeAndWait();){
            int columnSize = response.getColumns().size();
            String[] fieldNames = new String[columnSize];
            SeaTunnelDataType[] seaTunnelDataTypes = new SeaTunnelDataType[columnSize];
            for (int i = 0; i < columnSize; ++i) {
                fieldNames[i] = response.getColumns().get(i).getColumnName();
                seaTunnelDataTypes[i] = TypeConvertUtil.convert(response.getColumns().get(i));
            }
            this.rowTypeInfo = new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", this.getPluginName(), PluginType.SOURCE, e.getMessage()));
        }
    }

    private String modifySQLToLimit1(String sql) {
        return String.format("SELECT * FROM (%s) s LIMIT 1", sql);
    }

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    public SeaTunnelRowType getProducedType() {
        return this.rowTypeInfo;
    }

    public SourceReader<SeaTunnelRow, ClickhouseSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
        return new ClickhouseSourceReader(this.servers, readerContext, this.rowTypeInfo, this.sql);
    }

    public SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState> createEnumerator(SourceSplitEnumerator.Context<ClickhouseSourceSplit> enumeratorContext) throws Exception {
        return new ClickhouseSourceSplitEnumerator(enumeratorContext);
    }

    public SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState> restoreEnumerator(SourceSplitEnumerator.Context<ClickhouseSourceSplit> enumeratorContext, ClickhouseSourceState checkpointState) throws Exception {
        return new ClickhouseSourceSplitEnumerator(enumeratorContext);
    }
}

