/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;

import com.clickhouse.client.ClickHouseNode;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog.ClickhouseCatalog;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;

public class ClickhouseSink
implements SeaTunnelSink<SeaTunnelRow, ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo>,
SupportSaveMode {
    private ReaderOption option;
    private CatalogTable catalogTable;
    private ReadonlyConfig readonlyConfig;

    public ClickhouseSink(CatalogTable catalogTable, ReadonlyConfig readonlyConfig) {
        this.catalogTable = catalogTable;
        this.readonlyConfig = readonlyConfig;
    }

    public String getPluginName() {
        return "Clickhouse";
    }

    public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> createWriter(SinkWriter.Context context) throws IOException {
        List<ClickHouseNode> nodes = ClickhouseUtil.createNodes(this.readonlyConfig);
        Properties clickhouseProperties = new Properties();
        ((Map)this.readonlyConfig.get(ClickhouseBaseOptions.CLICKHOUSE_CONFIG)).forEach((key, value) -> clickhouseProperties.put(key, String.valueOf(value)));
        clickhouseProperties.put("user", this.readonlyConfig.get(ClickhouseBaseOptions.USERNAME));
        clickhouseProperties.put("password", this.readonlyConfig.get(ClickhouseBaseOptions.PASSWORD));
        ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
        Map<String, String> tableSchema = proxy.getClickhouseTableSchema((String)this.readonlyConfig.get(ClickhouseSinkOptions.TABLE));
        String shardKey = null;
        String shardKeyType = null;
        ClickhouseTable table = proxy.getClickhouseTable(proxy.getClickhouseConnection(), (String)this.readonlyConfig.get(ClickhouseBaseOptions.DATABASE), (String)this.readonlyConfig.get(ClickhouseSinkOptions.TABLE));
        if (((Boolean)this.readonlyConfig.get(ClickhouseSinkOptions.SPLIT_MODE)).booleanValue()) {
            if (!"Distributed".equals(table.getEngine())) {
                throw new ClickhouseConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "split mode only support table which engine is 'Distributed' engine at now");
            }
            if (this.readonlyConfig.getOptional(ClickhouseSinkOptions.SHARDING_KEY).isPresent()) {
                shardKey = (String)this.readonlyConfig.get(ClickhouseSinkOptions.SHARDING_KEY);
                shardKeyType = tableSchema.get(shardKey);
            }
        }
        ShardMetadata metadata = new ShardMetadata(shardKey, shardKeyType, table.getSortingKey(), (String)this.readonlyConfig.get(ClickhouseBaseOptions.DATABASE), (String)this.readonlyConfig.get(ClickhouseSinkOptions.TABLE), table.getEngine(), (Boolean)this.readonlyConfig.get(ClickhouseSinkOptions.SPLIT_MODE), new Shard(1, 1, nodes.get(0)), (String)this.readonlyConfig.get(ClickhouseBaseOptions.USERNAME), (String)this.readonlyConfig.get(ClickhouseBaseOptions.PASSWORD));
        proxy.close();
        String[] primaryKeys = null;
        if (this.readonlyConfig.getOptional(ClickhouseSinkOptions.PRIMARY_KEY).isPresent()) {
            String primaryKey = (String)this.readonlyConfig.get(ClickhouseSinkOptions.PRIMARY_KEY);
            if (primaryKey == null || primaryKey.trim().isEmpty()) {
                throw new ClickhouseConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "primary_key can not be empty");
            }
            if (shardKey != null && !Objects.equals(primaryKey, shardKey)) {
                throw new ClickhouseConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "sharding_key and primary_key must be consistent to ensure correct processing of cdc events");
            }
            primaryKeys = primaryKey.replaceAll("\\s+", "").split(",");
        }
        boolean supportUpsert = (Boolean)this.readonlyConfig.get(ClickhouseSinkOptions.SUPPORT_UPSERT);
        boolean allowExperimentalLightweightDelete = (Boolean)this.readonlyConfig.get(ClickhouseSinkOptions.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE);
        ReaderOption option = ReaderOption.builder().shardMetadata(metadata).properties(clickhouseProperties).seaTunnelRowType(this.catalogTable.getSeaTunnelRowType()).tableEngine(table.getEngine()).tableSchema(tableSchema).bulkSize((Integer)this.readonlyConfig.get(ClickhouseSinkOptions.BULK_SIZE)).primaryKeys(primaryKeys).supportUpsert(supportUpsert).allowExperimentalLightweightDelete(allowExperimentalLightweightDelete).build();
        return new ClickhouseSinkWriter(option, context);
    }

    public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> restoreWriter(SinkWriter.Context context, List<ClickhouseSinkState> states) throws IOException {
        return super.restoreWriter(context, states);
    }

    public Optional<Serializer<ClickhouseSinkState>> getWriterStateSerializer() {
        return Optional.of(new DefaultSerializer());
    }

    public Optional<CatalogTable> getWriteCatalogTable() {
        return Optional.of(this.catalogTable);
    }

    public Optional<SaveModeHandler> getSaveModeHandler() {
        TablePath tablePath = TablePath.of((String)((String)this.readonlyConfig.get(ClickhouseBaseOptions.DATABASE)), (String)((String)this.readonlyConfig.get(ClickhouseSinkOptions.TABLE)));
        ClickhouseCatalog clickhouseCatalog = new ClickhouseCatalog(this.readonlyConfig, "clickhouse");
        SchemaSaveMode schemaSaveMode = (SchemaSaveMode)this.readonlyConfig.get(ClickhouseSinkOptions.SCHEMA_SAVE_MODE);
        DataSaveMode dataSaveMode = (DataSaveMode)this.readonlyConfig.get(ClickhouseSinkOptions.DATA_SAVE_MODE);
        return Optional.of(new DefaultSaveModeHandler(schemaSaveMode, dataSaveMode, (Catalog)clickhouseCatalog, tablePath, this.catalogTable, (String)this.readonlyConfig.get(ClickhouseSinkOptions.CUSTOM_SQL)));
    }
}

