/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.sls.source;

import com.alibaba.ververica.connectors.common.source.resolver.RecordResolver;
import com.alibaba.ververica.connectors.common.table.AbstractVervericaTableSource;
import com.alibaba.ververica.connectors.common.table.VervericaTableOptions;
import com.alibaba.ververica.connectors.common.util.DescriptorPropertiesUtil;
import com.alibaba.ververica.connectors.sls.SLSAccessInfo;
import com.alibaba.ververica.connectors.sls.SLSMetadata;
import com.alibaba.ververica.connectors.sls.SLSOptions;
import com.alibaba.ververica.connectors.sls.SLSUtils;
import com.alibaba.ververica.connectors.sls.source.SLSRecordParserFactory;
import com.alibaba.ververica.connectors.sls.source.SlsSourceFunction;
import com.alibaba.ververica.connectors.sls.source.SourceRecord;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.types.DataType;

public class SLSTableSource
extends AbstractVervericaTableSource<SourceRecord, RowData>
implements SupportsReadingMetadata {
    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private String tableName;
    private int startInSec;
    private int stopInSec;
    private SLSAccessInfo accessInfo;
    private List<String> metadataKeys;

    public SLSTableSource(DescriptorProperties descriptorProperties) {
        super(descriptorProperties);
        Configuration props = DescriptorPropertiesUtil.toConfiguration(descriptorProperties);
        this.accessInfo = SLSUtils.parseConsumerConfig(props);
        String startDateTime = props.getString(VervericaTableOptions.OPTIONAL_START_TIME);
        String stopDateTime = props.getString(SLSOptions.STOP_TIME);
        String timeZone = props.getString(SLSOptions.OPTIONAL_TIME_ZONE);
        this.startInSec = SLSUtils.getStartTimeInSecs(startDateTime, timeZone);
        this.stopInSec = SLSUtils.getStopTimeInSecs(stopDateTime, timeZone);
        this.tableName = props.getString(VervericaTableOptions.TABLE_NAME);
        this.metadataKeys = new ArrayList<String>();
    }

    public String explainSource() {
        return String.format("SLSTableSource-%s", this.tableName);
    }

    @Override
    public RecordResolver<SourceRecord, RowData> createRecordConverter() {
        return SLSRecordParserFactory.createParser(this.getTableSchema(), this.properties.asMap(), this.metadataKeys);
    }

    @Override
    public SourceFunction<SourceRecord> createSourceFunction() {
        return new SlsSourceFunction(this.accessInfo, this.startInSec, this.stopInSec);
    }

    @Override
    public boolean isBounded() {
        return this.startInSec > 0 && this.stopInSec != Integer.MAX_VALUE;
    }

    @Override
    public Class<RowData> getOutputClass() {
        return RowData.class;
    }

    public Map<String, DataType> listReadableMetadata() {
        LinkedHashMap<String, DataType> metadataMap = new LinkedHashMap<String, DataType>();
        Stream.of(SLSMetadata.values()).forEachOrdered(m3 -> metadataMap.putIfAbsent(m3.getKey(), m3.getDataType()));
        return metadataMap;
    }

    public void applyReadableMetadata(List<String> list, DataType dataType) {
        this.metadataKeys.clear();
        this.metadataKeys.addAll(list);
    }
}

