/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.redis.sink;

import com.google.common.collect.Lists;
import io.lettuce.core.RedisFuture;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.redis.RedisSession;
import org.apache.pulsar.io.redis.sink.RedisSinkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name="redis", type=IOType.SINK, help="A sink connector is used for moving messages from Pulsar to Redis.", configClass=RedisSinkConfig.class)
public class RedisSink
implements Sink<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(RedisSink.class);
    private RedisSinkConfig redisSinkConfig;
    private RedisSession redisSession;
    private long batchTimeMs;
    private long operationTimeoutMs;
    private int batchSize;
    private List<Record<byte[]>> incomingList;
    private ScheduledExecutorService flushExecutor;

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        log.info("Open Redis Sink");
        this.redisSinkConfig = RedisSinkConfig.load(config, sinkContext);
        this.redisSinkConfig.validate();
        this.redisSession = RedisSession.create(this.redisSinkConfig);
        this.operationTimeoutMs = this.redisSinkConfig.getOperationTimeout();
        this.batchTimeMs = this.redisSinkConfig.getBatchTimeMs();
        this.batchSize = this.redisSinkConfig.getBatchSize();
        this.incomingList = Lists.newArrayList();
        this.flushExecutor = Executors.newScheduledThreadPool(1);
        this.flushExecutor.scheduleAtFixedRate(this::flush, this.batchTimeMs, this.batchTimeMs, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(Record<byte[]> record) throws Exception {
        int currentSize;
        RedisSink redisSink = this;
        synchronized (redisSink) {
            this.incomingList.add(record);
            currentSize = this.incomingList.size();
        }
        if (currentSize == this.batchSize) {
            this.flushExecutor.execute(this::flush);
        }
    }

    public void close() throws Exception {
        if (null != this.redisSession) {
            this.redisSession.close();
        }
        if (null != this.flushExecutor) {
            this.flushExecutor.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flush() {
        List<Record<byte[]>> recordsToFlush;
        ConcurrentHashMap<byte[], byte[]> recordsToSet = new ConcurrentHashMap<byte[], byte[]>();
        RedisSink redisSink = this;
        synchronized (redisSink) {
            if (this.incomingList.isEmpty()) {
                return;
            }
            recordsToFlush = this.incomingList;
            this.incomingList = Lists.newArrayList();
        }
        if (CollectionUtils.isNotEmpty(recordsToFlush)) {
            for (Record record : recordsToFlush) {
                try {
                    String recordKey = record.getKey().isPresent() ? (String)record.getKey().get() : "";
                    byte[] key = recordKey.getBytes(StandardCharsets.UTF_8);
                    byte[] value = (byte[])record.getValue();
                    recordsToSet.put(key, value);
                }
                catch (Exception e) {
                    record.fail();
                    recordsToFlush.remove(record);
                    log.warn("Record flush thread was exception ", (Throwable)e);
                }
            }
        }
        try {
            if (recordsToSet.size() > 0) {
                RedisFuture future;
                if (log.isDebugEnabled()) {
                    log.debug("Calling mset with {} values", (Object)recordsToSet.size());
                }
                if (!(future = this.redisSession.asyncCommands().mset(recordsToSet)).await(this.operationTimeoutMs, TimeUnit.MILLISECONDS) || future.getError() != null) {
                    log.warn("Operation failed with error {} or timeout {} is exceeded", (Object)future.getError(), (Object)this.operationTimeoutMs);
                    recordsToFlush.forEach(Record::fail);
                    return;
                }
            }
            recordsToFlush.forEach(Record::ack);
            recordsToSet.clear();
            recordsToFlush.clear();
        }
        catch (InterruptedException e) {
            recordsToFlush.forEach(Record::fail);
            log.error("Redis mset data interrupted exception ", (Throwable)e);
        }
    }
}

