/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;

import com.clickhouse.client.ClickHouseRequest;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.FileTransfer;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.FileTransferFactory;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClickhouseFileSinkWriter
implements SinkWriter<SeaTunnelRow, CKFileCommitInfo, ClickhouseSinkState> {
    private static final Logger log = LoggerFactory.getLogger(ClickhouseFileSinkWriter.class);
    private static final String CK_LOCAL_CONFIG_TEMPLATE = "<yandex><path> %s </path> <users><default><password/> <profile>default</profile> <quota>default</quota><access_management>1</access_management></default></users><profiles><default/></profiles><quotas><default/></quotas></yandex>";
    private static final String CLICKHOUSE_SETTINGS_KEY = "SETTINGS";
    private static final String CLICKHOUSE_DDL_SETTING_FILTER = "storage_policy";
    private static final String CLICKHOUSE_LOCAL_FILE_SUFFIX = "/local_data.log";
    private static final int UUID_LENGTH = 10;
    private final FileReaderOption readerOption;
    private final ShardRouter shardRouter;
    private final ClickhouseProxy proxy;
    private final ClickhouseTable clickhouseTable;
    private final Map<Shard, List<String>> shardLocalDataPaths;
    private final Map<Shard, FileChannel> rowCache;
    private final Map<Shard, MappedByteBuffer> bufferCache;
    private final Integer bufferSize = 131072;
    private final Map<Shard, String> shardTempFile;
    private final SinkWriter.Context context;
    private final ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();

    public ClickhouseFileSinkWriter(FileReaderOption readerOption, SinkWriter.Context context) {
        this.readerOption = readerOption;
        this.context = context;
        this.proxy = new ClickhouseProxy(this.readerOption.getShardMetadata().getDefaultShard().getNode());
        this.shardRouter = new ShardRouter(this.proxy, this.readerOption.getShardMetadata());
        this.clickhouseTable = this.proxy.getClickhouseTable(this.proxy.getClickhouseConnection(), this.readerOption.getShardMetadata().getDatabase(), this.readerOption.getShardMetadata().getTable());
        this.rowCache = new HashMap<Shard, FileChannel>(16);
        this.bufferCache = new HashMap<Shard, MappedByteBuffer>(16);
        this.shardTempFile = new HashMap<Shard, String>();
        this.nodePasswordCheck();
        this.shardLocalDataPaths = this.shardRouter.getShards().values().stream().collect(Collectors.toMap(Function.identity(), shard -> {
            ClickHouseRequest<?> request = this.proxy.getClickhouseConnection((Shard)shard);
            ClickhouseTable shardTable = this.proxy.getClickhouseTable(request, shard.getNode().getDatabase().get(), this.clickhouseTable.getLocalTableName());
            return shardTable.getDataPaths();
        }));
    }

    public void write(SeaTunnelRow element) throws IOException {
        Shard shard = this.shardRouter.getShard(element);
        FileChannel channel = this.rowCache.computeIfAbsent(shard, k -> {
            String uuid = UUID.randomUUID().toString().substring(0, 10).replaceAll("-", "_");
            String clickhouseLocalFile = String.format("%s/%s", this.readerOption.getFileTempPath(), uuid);
            try {
                FileUtils.forceMkdir(new File(clickhouseLocalFile));
                String clickhouseLocalFileTmpFile = clickhouseLocalFile + CLICKHOUSE_LOCAL_FILE_SUFFIX;
                this.shardTempFile.put(shard, clickhouseLocalFileTmpFile);
                return FileChannel.open(Paths.get(clickhouseLocalFileTmpFile, new String[0]), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
            }
            catch (IOException e) {
                throw CommonError.fileOperationFailed((String)"ClickhouseFile", (String)"write", (String)clickhouseLocalFile, (Throwable)e);
            }
        });
        this.saveDataToFile(channel, element, shard);
    }

    private void nodePasswordCheck() {
        if (!this.readerOption.isNodeFreePass()) {
            this.shardRouter.getShards().values().forEach(shard -> {
                if (!this.readerOption.getNodePassword().containsKey(shard.getNode().getAddress().getHostName()) && !this.readerOption.getNodePassword().containsKey(shard.getNode().getHost())) {
                    throw new ClickhouseConnectorException((SeaTunnelErrorCode)ClickhouseConnectorErrorCode.PASSWORD_NOT_FOUND_IN_SHARD_NODE, "Cannot find password of shard " + shard.getNode().getAddress().getHostName());
                }
            });
        }
    }

    public Optional<CKFileCommitInfo> prepareCommit() throws IOException {
        for (FileChannel channel : this.rowCache.values()) {
            channel.close();
        }
        HashMap<Shard, List<String>> detachedFiles = new HashMap<Shard, List<String>>();
        this.shardTempFile.forEach((shard, path) -> {
            List<String> clickhouseLocalFiles = null;
            try {
                clickhouseLocalFiles = this.generateClickhouseLocalFiles((String)path);
                this.moveClickhouseLocalFileToServer((Shard)shard, clickhouseLocalFiles);
                detachedFiles.put((Shard)shard, clickhouseLocalFiles);
            }
            catch (Exception e) {
                throw new ClickhouseConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.FLUSH_DATA_FAILED, "Flush data into clickhouse file error", e);
            }
            finally {
                if (clickhouseLocalFiles != null && !clickhouseLocalFiles.isEmpty()) {
                    this.clearLocalFileDirectory(clickhouseLocalFiles);
                }
            }
        });
        this.rowCache.clear();
        this.shardTempFile.clear();
        return Optional.of(new CKFileCommitInfo(detachedFiles));
    }

    public void abortPrepare() {
    }

    public void close() throws IOException {
        for (FileChannel channel : this.rowCache.values()) {
            channel.close();
        }
    }

    private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row, Shard shard) throws IOException {
        String data = this.readerOption.getFields().stream().map(field -> {
            Object fieldValueObj = row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field));
            if (fieldValueObj == null) {
                return "";
            }
            return fieldValueObj.toString();
        }).collect(Collectors.joining(this.readerOption.getFileFieldsDelimiter())) + "\n";
        MappedByteBuffer buffer = this.bufferCache.computeIfAbsent(shard, k -> {
            try {
                return fileChannel.map(FileChannel.MapMode.READ_WRITE, 0L, this.bufferSize.intValue());
            }
            catch (IOException e) {
                throw CommonError.fileOperationFailed((String)"ClickhouseFile", (String)"write", (String)"UNKNOWN", (Throwable)e);
            }
        });
        byte[] byteData = data.getBytes(StandardCharsets.UTF_8);
        if (buffer.position() + byteData.length > buffer.capacity()) {
            buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(), this.bufferSize.intValue());
            this.bufferCache.put(shard, buffer);
        }
        buffer.put(byteData);
    }

    private List<String> generateClickhouseLocalFiles(String clickhouseLocalFileTmpFile) throws IOException, InterruptedException {
        String line2;
        Throwable throwable;
        BufferedReader bufferedReader2;
        Throwable throwable2;
        InputStreamReader inputStreamReader;
        String[] tmpStrArr = clickhouseLocalFileTmpFile.split("/");
        String uuid = tmpStrArr[tmpStrArr.length - 2];
        List localPaths = Arrays.stream(this.readerOption.getClickhouseLocalPath().trim().split(" ")).collect(Collectors.toList());
        String clickhouseLocalFile = clickhouseLocalFileTmpFile.substring(0, clickhouseLocalFileTmpFile.length() - CLICKHOUSE_LOCAL_FILE_SUFFIX.length());
        ArrayList command = new ArrayList(localPaths);
        if (localPaths.size() == 1) {
            command.add("local");
        }
        command.add("--file");
        command.add(clickhouseLocalFileTmpFile);
        command.add("--format_csv_delimiter");
        command.add("\"" + this.readerOption.getFileFieldsDelimiter() + "\"");
        command.add("-S");
        command.add("\"" + this.readerOption.getFields().stream().map(field -> field + " " + this.readerOption.getTableSchema().get(field)).collect(Collectors.joining(",")) + "\"");
        command.add("-N");
        command.add("\"temp_table" + uuid + "\"");
        command.add("-d _local");
        command.add("-n");
        command.add("-q");
        command.add(String.format("\"%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;\"", this.adjustClickhouseDDL(), this.clickhouseTable.getLocalTableName(), this.readerOption.getTableSchema().keySet().stream().map(s -> {
            if (this.readerOption.getFields().contains(s)) {
                return s;
            }
            return "NULL";
        }).collect(Collectors.joining(",")), uuid));
        if (this.readerOption.isCompatibleMode()) {
            String ckLocalConfigPath = String.format("%s/%s/config.xml", this.readerOption.getFileTempPath(), uuid);
            try (FileWriter writer = new FileWriter(ckLocalConfigPath);){
                writer.write(String.format(CK_LOCAL_CONFIG_TEMPLATE, clickhouseLocalFile));
            }
            catch (IOException e) {
                throw CommonError.fileOperationFailed((String)"ClickhouseFile", (String)"write", (String)clickhouseLocalFile, (Throwable)e);
            }
            command.add("--config-file");
            command.add("\"" + ckLocalConfigPath + "\"");
        } else {
            command.add("--path");
            command.add("\"" + clickhouseLocalFile + "\"");
        }
        log.info("Generate clickhouse local file command: {}", (Object)String.join((CharSequence)" ", command));
        ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", String.join((CharSequence)" ", command));
        Process start = processBuilder.start();
        try (InputStream inputStream = start.getInputStream();){
            inputStreamReader = new InputStreamReader(inputStream);
            throwable2 = null;
            try {
                bufferedReader2 = new BufferedReader(inputStreamReader);
                throwable = null;
                try {
                    while ((line2 = bufferedReader2.readLine()) != null) {
                        log.info(line2);
                    }
                }
                catch (Throwable line2) {
                    throwable = line2;
                    throw line2;
                }
                finally {
                    if (bufferedReader2 != null) {
                        if (throwable != null) {
                            try {
                                bufferedReader2.close();
                            }
                            catch (Throwable line2) {
                                throwable.addSuppressed(line2);
                            }
                        } else {
                            bufferedReader2.close();
                        }
                    }
                }
            }
            catch (Throwable bufferedReader2) {
                throwable2 = bufferedReader2;
                throw bufferedReader2;
            }
            finally {
                if (inputStreamReader != null) {
                    if (throwable2 != null) {
                        try {
                            inputStreamReader.close();
                        }
                        catch (Throwable bufferedReader2) {
                            throwable2.addSuppressed(bufferedReader2);
                        }
                    } else {
                        inputStreamReader.close();
                    }
                }
            }
        }
        inputStream = start.getErrorStream();
        var10_13 = null;
        try {
            inputStreamReader = new InputStreamReader(inputStream);
            throwable2 = null;
            try {
                bufferedReader2 = new BufferedReader(inputStreamReader);
                throwable = null;
                try {
                    while ((line2 = bufferedReader2.readLine()) != null) {
                        log.error(line2);
                    }
                }
                catch (Throwable throwable3) {
                    throwable = throwable3;
                    throw throwable3;
                }
                finally {
                    if (bufferedReader2 != null) {
                        if (throwable != null) {
                            try {
                                bufferedReader2.close();
                            }
                            catch (Throwable throwable4) {
                                throwable.addSuppressed(throwable4);
                            }
                        } else {
                            bufferedReader2.close();
                        }
                    }
                }
            }
            catch (Throwable throwable5) {
                throwable2 = throwable5;
                throw throwable5;
            }
            finally {
                if (inputStreamReader != null) {
                    if (throwable2 != null) {
                        try {
                            inputStreamReader.close();
                        }
                        catch (Throwable throwable6) {
                            throwable2.addSuppressed(throwable6);
                        }
                    } else {
                        inputStreamReader.close();
                    }
                }
            }
        }
        catch (Throwable throwable7) {
            var10_13 = throwable7;
            throw throwable7;
        }
        finally {
            if (inputStream != null) {
                if (var10_13 != null) {
                    try {
                        inputStream.close();
                    }
                    catch (Throwable throwable8) {
                        var10_13.addSuppressed(throwable8);
                    }
                } else {
                    inputStream.close();
                }
            }
        }
        start.waitFor();
        File file = new File(clickhouseLocalFile + "/data/_local/" + this.clickhouseTable.getLocalTableName());
        if (!file.exists()) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)ClickhouseConnectorErrorCode.FILE_NOT_EXISTS, "clickhouse local file not exists");
        }
        File[] files = file.listFiles();
        if (files == null) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)ClickhouseConnectorErrorCode.FILE_NOT_EXISTS, "clickhouse local file not exists");
        }
        return Arrays.stream(files).filter(File::isDirectory).filter(f -> !"detached".equals(f.getName())).map(f -> {
            File newFile = new File(f.getParent() + "/" + f.getName() + "_" + this.context.getIndexOfSubtask());
            if (f.renameTo(newFile)) {
                return newFile;
            }
            log.warn("rename file failed, will continue move file, but maybe cause file conflict");
            return f;
        }).map(File::getAbsolutePath).collect(Collectors.toList());
    }

    private void moveClickhouseLocalFileToServer(Shard shard, List<String> clickhouseLocalFiles) {
        String hostAddress = shard.getNode().getHost();
        String user = this.readerOption.getNodeUser().getOrDefault(hostAddress, "root");
        String password = this.readerOption.getNodePassword().getOrDefault(hostAddress, null);
        String keyPath = this.readerOption.getKeyPath();
        FileTransfer fileTransfer = FileTransferFactory.createFileTransfer(this.readerOption.getCopyMethod(), hostAddress, user, password, keyPath);
        fileTransfer.init();
        int randomPath = this.threadLocalRandom.nextInt(this.shardLocalDataPaths.get(shard).size());
        fileTransfer.transferAndChown(clickhouseLocalFiles, this.shardLocalDataPaths.get(shard).get(randomPath) + "detached/");
        fileTransfer.close();
    }

    private void clearLocalFileDirectory(List<String> clickhouseLocalFiles) {
        String clickhouseLocalFile = clickhouseLocalFiles.get(0);
        String localFileDir = clickhouseLocalFile.substring(0, this.readerOption.getFileTempPath().length() + 10 + 1);
        try {
            File file = new File(localFileDir);
            if (file.exists()) {
                FileUtils.deleteDirectory(file);
            }
        }
        catch (IOException e) {
            throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.DELETE_DIRECTORY_FIELD, "Unable to delete directory " + localFileDir, e);
        }
    }

    private String adjustClickhouseDDL() {
        String createTableDDL = this.clickhouseTable.getCreateTableDDL().replace(this.clickhouseTable.getDatabase() + ".", "").replaceAll("`", "");
        if (createTableDDL.contains(CLICKHOUSE_SETTINGS_KEY)) {
            List filters = Arrays.stream(CLICKHOUSE_DDL_SETTING_FILTER.split(",")).collect(Collectors.toList());
            int p = createTableDDL.indexOf(CLICKHOUSE_SETTINGS_KEY);
            String filteredSetting = Arrays.stream(createTableDDL.substring(p + CLICKHOUSE_SETTINGS_KEY.length()).split(",")).filter(e -> !filters.contains(e.split("=")[0].trim())).collect(Collectors.joining(","));
            createTableDDL = createTableDDL.substring(0, p) + CLICKHOUSE_SETTINGS_KEY + filteredSetting;
        }
        return createTableDDL;
    }
}

