/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.kudu.source;

import com.google.auto.service.AutoService;
import java.util.ArrayList;
import java.util.List;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.kudu.source.KuduSourceReader;
import org.apache.seatunnel.connectors.seatunnel.kudu.source.KuduSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.kudu.source.KuduSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.kudu.source.PartitionParameter;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService(value={SeaTunnelSource.class})
public class KuduSource
implements SeaTunnelSource<SeaTunnelRow, KuduSourceSplit, KuduSourceState>,
SupportParallelism {
    private static final Logger log = LoggerFactory.getLogger(KuduSource.class);
    private SeaTunnelRowType rowTypeInfo;
    private KuduInputFormat kuduInputFormat;
    private PartitionParameter partitionParameter;
    public static final int TIMEOUTMS = 18000;

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    public SeaTunnelRowType getProducedType() {
        return this.rowTypeInfo;
    }

    public SourceReader<SeaTunnelRow, KuduSourceSplit> createReader(SourceReader.Context readerContext) {
        return new KuduSourceReader(this.kuduInputFormat, readerContext);
    }

    public Serializer<KuduSourceSplit> getSplitSerializer() {
        return super.getSplitSerializer();
    }

    public SourceSplitEnumerator<KuduSourceSplit, KuduSourceState> createEnumerator(SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext) {
        return new KuduSourceSplitEnumerator(enumeratorContext, this.partitionParameter);
    }

    public SourceSplitEnumerator<KuduSourceSplit, KuduSourceState> restoreEnumerator(SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext, KuduSourceState checkpointState) {
        return new KuduSourceSplitEnumerator(enumeratorContext, this.partitionParameter);
    }

    public Serializer<KuduSourceState> getEnumeratorStateSerializer() {
        return new DefaultSerializer();
    }

    public String getPluginName() {
        return "Kudu";
    }

    public void prepare(Config config) {
        String kudumaster = "";
        String tableName = "";
        String columnslist = "";
        CheckResult checkResult = CheckConfigUtil.checkAllExists(config, KuduSourceConfig.KUDU_MASTER.key(), KuduSourceConfig.TABLE_NAME.key(), KuduSourceConfig.COLUMNS_LIST.key());
        if (!checkResult.isSuccess()) {
            throw new KuduConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", new Object[]{this.getPluginName(), PluginType.SINK, checkResult.getMsg()}));
        }
        kudumaster = config.getString(KuduSourceConfig.KUDU_MASTER.key());
        tableName = config.getString(KuduSourceConfig.TABLE_NAME.key());
        columnslist = config.getString(KuduSourceConfig.COLUMNS_LIST.key());
        this.kuduInputFormat = new KuduInputFormat(kudumaster, tableName, columnslist);
        try {
            SeaTunnelRowType seaTunnelRowType;
            KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(kudumaster);
            kuduClientBuilder.defaultOperationTimeoutMs(18000L);
            KuduClient kuduClient = kuduClientBuilder.build();
            this.partitionParameter = this.initPartitionParameter(kuduClient, tableName);
            this.rowTypeInfo = seaTunnelRowType = this.getSeaTunnelRowType(kuduClient.openTable(tableName).getSchema().getColumns());
        }
        catch (KuduException e) {
            throw new KuduConnectorException((SeaTunnelErrorCode)KuduConnectorErrorCode.GENERATE_KUDU_PARAMETERS_FAILED, (Throwable)e);
        }
    }

    private PartitionParameter initPartitionParameter(KuduClient kuduClient, String tableName) {
        String keyColumn = null;
        int maxKey = 0;
        int minKey = 0;
        boolean flag = true;
        try {
            KuduScanner.KuduScannerBuilder kuduScannerBuilder = kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
            ArrayList<String> columnsList = new ArrayList<String>();
            keyColumn = kuduClient.openTable(tableName).getSchema().getPrimaryKeyColumns().get(0).getName();
            columnsList.add("" + keyColumn);
            kuduScannerBuilder.setProjectedColumnNames(columnsList);
            KuduScanner kuduScanner = kuduScannerBuilder.build();
            while (kuduScanner.hasMoreRows()) {
                RowResultIterator rowResults = kuduScanner.nextRows();
                while (rowResults.hasNext()) {
                    RowResult row = rowResults.next();
                    int id = row.getInt("" + keyColumn);
                    if (flag) {
                        maxKey = id;
                        minKey = id;
                        flag = false;
                        continue;
                    }
                    if (id >= maxKey) {
                        maxKey = id;
                    }
                    if (id > minKey) continue;
                    minKey = id;
                }
            }
        }
        catch (KuduException e) {
            throw new KuduConnectorException((SeaTunnelErrorCode)KuduConnectorErrorCode.GENERATE_KUDU_PARAMETERS_FAILED, "Failed to generate upper and lower limits for each partition");
        }
        return new PartitionParameter(keyColumn, Long.parseLong(minKey + ""), Long.parseLong(maxKey + ""));
    }

    public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> columnSchemaList) {
        ArrayList seaTunnelDataTypes = new ArrayList();
        ArrayList<String> fieldNames = new ArrayList<String>();
        try {
            for (int i = 0; i < columnSchemaList.size(); ++i) {
                fieldNames.add(columnSchemaList.get(i).getName());
                seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
            }
        }
        catch (Exception e) {
            throw new KuduConnectorException((SeaTunnelErrorCode)CommonErrorCode.TABLE_SCHEMA_GET_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", new Object[]{"Kudu", PluginType.SOURCE, ExceptionUtils.getMessage(e)}));
        }
        return new SeaTunnelRowType(fieldNames.toArray(new String[0]), seaTunnelDataTypes.toArray(new SeaTunnelDataType[0]));
    }
}

