/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.sls.source;

import com.alibaba.ververica.connectors.common.errorcode.ConnectorErrors;
import com.alibaba.ververica.connectors.common.source.reader.AbstractPartitionNumsListener;
import com.alibaba.ververica.connectors.common.source.reader.Interruptible;
import com.alibaba.ververica.connectors.common.source.reader.RecordReader;
import com.alibaba.ververica.connectors.sls.SLSAccessInfo;
import com.alibaba.ververica.connectors.sls.SLSUtils;
import com.alibaba.ververica.connectors.sls.source.SlsClientProxy;
import com.alibaba.ververica.connectors.sls.source.SourceRecord;
import com.aliyun.openservices.log.common.Consts;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.log.common.Shard;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.BatchGetLogResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SLSRecordReader
extends AbstractPartitionNumsListener
implements RecordReader<SourceRecord, String>,
Interruptible {
    private static final Logger LOG = LoggerFactory.getLogger(SLSRecordReader.class);
    private final String project;
    private final String logstore;
    private final int startInSec;
    private final int stopInSec;
    private final int maxRetryTime;
    private final int batchGetSize;
    private int shardId = -1;
    private String lastSuccessfulCursor;
    private String nextBeginCursor;
    private String stopCursor;
    private long currentWatermark;
    private int lastSuccessMessageTimestamp;
    private volatile boolean interrupted = false;
    private transient List<FastLogGroup> buffer;
    private long lastLogPrintTime = 0L;
    private long dataFetchedDelay = 0L;
    private long mLastFetchRawSize = Long.MAX_VALUE;
    private long mLastFetchCount = Long.MAX_VALUE;
    private long mLastFetchTime = Long.MIN_VALUE;
    private transient Shard shard;
    private final transient SlsClientProxy clientProxy;
    private boolean isReadOnlyShard = false;
    private boolean shardExpired = false;
    private String endCursor;

    public SLSRecordReader(SLSAccessInfo accessInfo, int startInSec, int stopInSec, List<Shard> initShardList, Configuration conf) {
        this.project = accessInfo.getProjectName();
        this.logstore = accessInfo.getLogstore();
        this.maxRetryTime = accessInfo.getMaxRetries();
        this.batchGetSize = accessInfo.getBatchGetSize();
        this.setInitPartitionCount(null == initShardList ? 0 : initShardList.size());
        this.startInSec = startInSec;
        this.stopInSec = stopInSec;
        this.buffer = new ArrayList<FastLogGroup>();
        this.clientProxy = new SlsClientProxy(accessInfo, conf);
    }

    @Override
    public int getPartitionsNums() {
        List<Shard> shards = this.clientProxy.listShards();
        return shards == null ? 0 : shards.size();
    }

    @Override
    public String getReaderName() {
        return "SLSRecordReader-" + this.project + "." + this.logstore;
    }

    private void initStopCursor() {
        this.stopCursor = this.stopInSec <= 0 || this.stopInSec >= Integer.MAX_VALUE ? null : this.clientProxy.getCursor(this.stopInSec, this.shardId);
    }

    @Override
    public void open(InputSplit split, RuntimeContext context) throws IOException {
        List<Shard> shardsList = this.clientProxy.listShards();
        this.shardId = split.getSplitNumber();
        for (Shard shard : shardsList) {
            if (shard.GetShardId() != this.shardId) continue;
            this.shard = shard;
            break;
        }
        if (this.shard == null) {
            this.shardExpired = true;
            LOG.error("The shard has been expired {}", (Object)this.shardId);
            return;
        }
        String status = this.shard.getStatus();
        if ("readonly".equalsIgnoreCase(status)) {
            LOG.info("ShardId {} status = {}", (Object)this.shardId, (Object)status);
            this.isReadOnlyShard = true;
            this.endCursor = this.clientProxy.getCursor(Consts.CursorMode.END, this.shardId);
        } else {
            LOG.info("ShardId {} status = {}", (Object)this.shardId, (Object)status);
            this.isReadOnlyShard = false;
        }
        this.nextBeginCursor = this.clientProxy.getCursor(this.startInSec, this.shardId);
        this.initStopCursor();
        this.initPartitionNumsListener();
    }

    private long getThrottlingInterval() {
        if (this.mLastFetchRawSize < 0x100000L && this.mLastFetchCount < (long)this.batchGetSize) {
            return 200L + this.mLastFetchTime - System.currentTimeMillis();
        }
        if (this.mLastFetchRawSize < 0x200000L && this.mLastFetchCount < (long)this.batchGetSize) {
            return 100L + this.mLastFetchTime - System.currentTimeMillis();
        }
        return 0L;
    }

    private boolean shouldStop() {
        return this.interrupted || this.shardExpired || this.nextBeginCursor.equalsIgnoreCase(this.stopCursor) || this.isReadOnlyShard && this.nextBeginCursor.equalsIgnoreCase(this.endCursor);
    }

    @Override
    public boolean next() {
        int currRetryTime = 0;
        while (this.buffer.isEmpty()) {
            if (this.isPartitionChanged()) {
                throw new RuntimeException(ConnectorErrors.INST.sourcePartitionChangeFailOverRecoryError(this.getReaderName(), String.valueOf(this.initPartitionCount), String.valueOf(this.getPartitionsNums())));
            }
            if (this.shouldStop()) {
                LOG.warn("Stopping reader for shard {}", (Object)this.shardId);
                return false;
            }
            long waitInterval = this.getThrottlingInterval();
            if (waitInterval > 0L) {
                SLSUtils.backoff(waitInterval);
            }
            try {
                BatchGetLogResponse response = this.clientProxy.pullData(this.shardId, this.batchGetSize, this.nextBeginCursor, this.stopCursor);
                this.mLastFetchRawSize = response.GetRawSize();
                this.mLastFetchCount = response.GetCount();
                this.mLastFetchTime = System.currentTimeMillis();
                int count = response.GetCount();
                if (count > 0) {
                    this.lastSuccessfulCursor = this.nextBeginCursor;
                    int timestamp = SLSRecordReader.extractArrivalServerTime(response.GetLogGroup(count - 1));
                    if (timestamp > 0) {
                        this.lastSuccessMessageTimestamp = timestamp;
                    } else {
                        timestamp = this.clientProxy.getCursorTime(this.shardId, response.GetNextCursor());
                        if (timestamp > 0) {
                            this.lastSuccessMessageTimestamp = timestamp;
                        }
                    }
                    this.currentWatermark = (long)this.lastSuccessMessageTimestamp * 1000L;
                    this.dataFetchedDelay = System.currentTimeMillis() - this.currentWatermark;
                    for (LogGroupData item : response.GetLogGroups()) {
                        this.buffer.add(item.GetFastLogGroup());
                    }
                }
                this.nextBeginCursor = response.GetNextCursor();
            }
            catch (LogException ex) {
                if ("ShardNotExist".equalsIgnoreCase(ex.GetErrorCode())) {
                    this.shardExpired = true;
                    LOG.error("Shard has expired {}", (Object)this.shardId);
                    return false;
                }
                if (++currRetryTime <= this.maxRetryTime) {
                    LOG.warn("Fail in fetching data from sls, retrying...", ex);
                    SLSUtils.backoff(currRetryTime * 1000);
                    continue;
                }
                throw new RuntimeException("Error pulling data, errorCode: " + ex.GetErrorCode() + ", errorMessage: " + ex.GetErrorMessage());
            }
        }
        if (System.currentTimeMillis() - this.lastLogPrintTime >= 60000L) {
            this.clientProxy.updateCheckpointIfNeeded(this.shardId, this.lastSuccessfulCursor);
            this.lastLogPrintTime = System.currentTimeMillis();
            LOG.info(String.format("Next method get current cursor, project[%s]-logStore[%s]-shardId[%d]-progress[%d]-delay[%d]-Cursor[%s]-nextCursor[%s]-EndCursor[%s]", this.project, this.logstore, this.shardId, this.lastSuccessMessageTimestamp, System.currentTimeMillis() - (long)this.lastSuccessMessageTimestamp, this.lastSuccessfulCursor, this.nextBeginCursor, this.stopCursor));
        }
        return true;
    }

    @Override
    public SourceRecord getMessage() {
        SourceRecord record = new SourceRecord(this.buffer);
        this.buffer = new ArrayList<FastLogGroup>();
        return record;
    }

    private static int extractArrivalServerTime(LogGroupData data) {
        if (data == null) {
            return -1;
        }
        FastLogGroup logGroup = data.GetFastLogGroup();
        for (int i = logGroup.getLogTagsCount() - 1; i >= 0; --i) {
            FastLogTag tag = logGroup.getLogTags(i);
            if (!"__receive_time__".equalsIgnoreCase(tag.getKey())) continue;
            try {
                return Integer.parseInt(tag.getValue());
            }
            catch (Exception ex) {
                LOG.warn("Invalid receive time: {}", (Object)tag.getValue());
                break;
            }
        }
        return -1;
    }

    @Override
    public void close() throws IOException {
        this.destroyPartitionNumsListener();
    }

    @Override
    public void seek(String s2) throws IOException {
        this.nextBeginCursor = s2.equalsIgnoreCase("new_sls_start_flag") ? this.clientProxy.getCursor(Consts.CursorMode.BEGIN, this.shardId) : s2;
    }

    @Override
    public String getProgress() throws IOException {
        return this.nextBeginCursor;
    }

    @Override
    public long getDelay() {
        return this.currentWatermark;
    }

    @Override
    public long getFetchedDelay() {
        return this.dataFetchedDelay;
    }

    @Override
    public boolean isHeartBeat() {
        return false;
    }

    @Override
    public long getWatermark() {
        return this.currentWatermark;
    }

    @Override
    public void interrupt() {
        this.interrupted = true;
    }
}

