/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;

import io.airlift.compress.lzo.LzopCodec;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.AbstractReadStrategy;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JsonReadStrategy
extends AbstractReadStrategy {
    private static final Logger log = LoggerFactory.getLogger(JsonReadStrategy.class);
    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private CompressFormat compressFormat = (CompressFormat)BaseSourceConfigOptions.COMPRESS_CODEC.defaultValue();
    private String encoding = (String)BaseSourceConfigOptions.ENCODING.defaultValue();

    @Override
    public void init(HadoopConf conf) {
        super.init(conf);
        if (this.pluginConfig.hasPath(BaseSourceConfigOptions.COMPRESS_CODEC.key())) {
            String compressCodec = this.pluginConfig.getString(BaseSourceConfigOptions.COMPRESS_CODEC.key());
            this.compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase());
        }
        this.encoding = ReadonlyConfig.fromConfig((Config)this.pluginConfig).getOptional(BaseSourceConfigOptions.ENCODING).orElse(StandardCharsets.UTF_8.name());
    }

    @Override
    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
        super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
        this.deserializationSchema = this.isMergePartition ? new JsonDeserializationSchema(false, false, this.seaTunnelRowTypeWithPartition) : new JsonDeserializationSchema(false, false, this.seaTunnelRowType);
    }

    @Override
    public void read(String path, String tableId, Collector<SeaTunnelRow> output) throws FileConnectorException, IOException {
        FSDataInputStream inputStream;
        Map<String, String> partitionsMap = this.parsePartitionsByPath(path);
        switch (this.compressFormat) {
            case LZO: {
                LzopCodec lzo = new LzopCodec();
                inputStream = lzo.createInputStream((InputStream)this.hadoopFileSystemProxy.getInputStream(path));
                break;
            }
            case NONE: {
                inputStream = this.hadoopFileSystemProxy.getInputStream(path);
                break;
            }
            default: {
                log.warn("Text file does not support this compress type: {}", (Object)this.compressFormat.getCompressCodec());
                inputStream = this.hadoopFileSystemProxy.getInputStream(path);
            }
        }
        try (BufferedReader reader = new BufferedReader(new InputStreamReader((InputStream)inputStream, this.encoding));){
            reader.lines().forEach(line -> {
                try {
                    SeaTunnelRow seaTunnelRow = (SeaTunnelRow)this.deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8));
                    if (this.isMergePartition) {
                        int index = this.seaTunnelRowType.getTotalFields();
                        for (String value : partitionsMap.values()) {
                            seaTunnelRow.setField(index++, (Object)value);
                        }
                    }
                    seaTunnelRow.setTableId(tableId);
                    output.collect((Object)seaTunnelRow);
                }
                catch (IOException e) {
                    throw CommonError.fileOperationFailed((String)"JsonFile", (String)"read", (String)path, (Throwable)e);
                }
            });
        }
    }

    @Override
    public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws FileConnectorException {
        throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, "User must defined schema for json file type");
    }
}

