/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.sls.sink;

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.request.PutLogsRequest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.sls.config.Config;
import org.apache.seatunnel.connectors.seatunnel.sls.serialization.SeatunnelRowSerialization;
import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsSinkState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SlsSinkWriter
implements SinkWriter<SeaTunnelRow, SlsCommitInfo, SlsSinkState> {
    private static final Logger log = LoggerFactory.getLogger(SlsSinkWriter.class);
    private final Client client;
    private final String project;
    private final String logStore;
    private final String topic;
    private final String source;
    private final Integer logGroupSize;
    private final SinkWriter.Context context;
    private final List<SlsSinkState> slsStates;
    private final SeatunnelRowSerialization seatunnelRowSerialization;

    public SlsSinkWriter(SinkWriter.Context context, SeaTunnelRowType seaTunnelRowType, ReadonlyConfig pluginConfig, List<SlsSinkState> slsStates) {
        this.client = new Client((String)pluginConfig.get(Config.ENDPOINT), (String)pluginConfig.get(Config.ACCESS_KEY_ID), (String)pluginConfig.get(Config.ACCESS_KEY_SECRET));
        this.project = (String)pluginConfig.get(Config.PROJECT);
        this.logStore = (String)pluginConfig.get(Config.LOGSTORE);
        this.topic = (String)pluginConfig.get(Config.TOPIC);
        this.source = (String)pluginConfig.get(Config.SOURCE);
        this.logGroupSize = (Integer)pluginConfig.get(Config.LOG_GROUP_SIZE);
        this.context = context;
        this.slsStates = slsStates;
        this.seatunnelRowSerialization = new SeatunnelRowSerialization(seaTunnelRowType);
    }

    public void write(SeaTunnelRow element) throws IOException {
        List<LogItem> data = this.seatunnelRowSerialization.serializeRow(element);
        PutLogsRequest plr = new PutLogsRequest(this.project, this.logStore, this.topic, this.source, data);
        try {
            this.client.PutLogs(plr);
        }
        catch (Throwable e) {
            log.error("write logs failed", e);
            e.printStackTrace();
            throw new IOException(e);
        }
    }

    public Optional<SlsCommitInfo> prepareCommit() throws IOException {
        return Optional.empty();
    }

    public void abortPrepare() {
    }

    public List<SlsSinkState> snapshotState(long checkpointId) {
        return new ArrayList<SlsSinkState>();
    }

    public void close() throws IOException {
        this.client.shutdown();
    }
}

