/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.redis.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import org.apache.seatunnel.format.json.JsonSerializationSchema;

public class RedisSinkWriter
extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
    private final SeaTunnelRowType seaTunnelRowType;
    private final RedisParameters redisParameters;
    private final SerializationSchema serializationSchema;
    private final RedisClient redisClient;
    private final int batchSize;
    private final List<String> keyBuffer;
    private final List<String> valueBuffer;

    public RedisSinkWriter(SeaTunnelRowType seaTunnelRowType, RedisParameters redisParameters) {
        this.seaTunnelRowType = seaTunnelRowType;
        this.redisParameters = redisParameters;
        this.serializationSchema = new JsonSerializationSchema(seaTunnelRowType);
        this.redisClient = redisParameters.buildRedisClient();
        this.batchSize = redisParameters.getBatchSize();
        this.keyBuffer = new ArrayList<String>(this.batchSize);
        this.valueBuffer = new ArrayList<String>(this.batchSize);
    }

    public void write(SeaTunnelRow element) throws IOException {
        String data = new String(this.serializationSchema.serialize(element));
        String keyField = this.redisParameters.getKeyField();
        List<String> fields = Arrays.asList(this.seaTunnelRowType.getFieldNames());
        String key = fields.contains(keyField) ? element.getField(fields.indexOf(keyField)).toString() : keyField;
        this.keyBuffer.add(key);
        this.valueBuffer.add(data);
        if (this.keyBuffer.size() >= this.batchSize) {
            this.doBatchWrite();
            this.clearBuffer();
        }
    }

    private void clearBuffer() {
        this.keyBuffer.clear();
        this.valueBuffer.clear();
    }

    private void doBatchWrite() {
        RedisDataType redisDataType = this.redisParameters.getRedisDataType();
        if (RedisDataType.KEY.equals((Object)redisDataType) || RedisDataType.STRING.equals((Object)redisDataType)) {
            this.redisClient.batchWriteString(this.keyBuffer, this.valueBuffer, this.redisParameters.getExpire());
            return;
        }
        if (RedisDataType.LIST.equals((Object)redisDataType)) {
            this.redisClient.batchWriteList(this.keyBuffer, this.valueBuffer, this.redisParameters.getExpire());
            return;
        }
        if (RedisDataType.SET.equals((Object)redisDataType)) {
            this.redisClient.batchWriteSet(this.keyBuffer, this.valueBuffer, this.redisParameters.getExpire());
            return;
        }
        if (RedisDataType.HASH.equals((Object)redisDataType)) {
            this.redisClient.batchWriteHash(this.keyBuffer, this.valueBuffer, this.redisParameters.getExpire());
            return;
        }
        if (RedisDataType.ZSET.equals((Object)redisDataType)) {
            this.redisClient.batchWriteZset(this.keyBuffer, this.valueBuffer, this.redisParameters.getExpire());
            return;
        }
        throw new RedisConnectorException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_DATA_TYPE, "UnSupport redisDataType,only support string,list,hash,set,zset");
    }

    public void close() throws IOException {
        if (!this.keyBuffer.isEmpty()) {
            this.doBatchWrite();
            this.clearBuffer();
        }
    }
}

