/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.sls.source;

import com.alibaba.ververica.connectors.sls.SLSAccessInfo;
import com.alibaba.ververica.connectors.sls.SLSUtils;
import com.alibaba.ververica.connectors.sls.source.SLSClientProvider;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts;
import com.aliyun.openservices.log.common.Shard;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.BatchGetLogResponse;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SlsClientProxy {
    private static final Logger LOG = LoggerFactory.getLogger(SlsClientProxy.class);
    private static final String ERROR_SHARD_NOT_EXIST = "ShardNotExist";
    private final String project;
    private final String logstore;
    private final String consumerGroup;
    private final transient SLSClientProvider clientProvider;
    static final int MAX_RETRIES_FOR_NON_SERVER_ERROR = 3;
    private static final long MAX_RETRY_TIMEOUT_MILLIS = 600000L;

    public SlsClientProxy(SLSAccessInfo accessInfo, Configuration conf) {
        this.project = accessInfo.getProjectName();
        this.logstore = accessInfo.getLogstore();
        this.consumerGroup = accessInfo.getConsumerGroup();
        this.clientProvider = new SLSClientProvider(accessInfo, conf);
    }

    List<Shard> listShards() {
        RequestContext ctx = new RequestContext(this.clientProvider);
        while (true) {
            try {
                return ctx.getClient().ListShard(this.project, this.logstore).GetShards();
            }
            catch (LogException ex) {
                if (ctx.waitForNextRetry(ex)) continue;
                LOG.error("Error while listing shards, code = {}", (Object)ex.GetErrorCode(), (Object)ex);
                throw new RuntimeException(ex);
            }
            break;
        }
    }

    String getCursor(Consts.CursorMode cursorMode, int shard) {
        RequestContext ctx = new RequestContext(this.clientProvider);
        while (true) {
            try {
                return ctx.getClient().GetCursor(this.project, this.logstore, shard, cursorMode).GetCursor();
            }
            catch (LogException ex) {
                if (ctx.waitForNextRetry(ex)) continue;
                LOG.error("Error while fetching cursor, code = {}", (Object)ex.GetErrorCode(), (Object)ex);
                throw new RuntimeException(ex);
            }
            break;
        }
    }

    String getCursor(int ts, int shard) {
        RequestContext ctx = new RequestContext(this.clientProvider);
        while (true) {
            try {
                return ctx.getClient().GetCursor(this.project, this.logstore, shard, ts).GetCursor();
            }
            catch (LogException ex) {
                if (ctx.waitForNextRetry(ex)) continue;
                LOG.error("Error while fetching cursor, code = {}", (Object)ex.GetErrorCode(), (Object)ex);
                throw new RuntimeException(ex);
            }
            break;
        }
    }

    int getCursorTime(int shard, String cursor) {
        RequestContext ctx = new RequestContext(this.clientProvider);
        while (true) {
            try {
                return ctx.getClient().GetCursorTime(this.project, this.logstore, shard, cursor).GetCursorTime();
            }
            catch (LogException ex) {
                if (ctx.waitForNextRetry(ex)) continue;
                LOG.error("Error while fetching cursor time", ex);
                return -1;
            }
            break;
        }
    }

    void updateCheckpointIfNeeded(int shard, String cursor) {
        if (this.consumerGroup == null || this.consumerGroup.isEmpty()) {
            return;
        }
        RequestContext ctx = new RequestContext(this.clientProvider);
        while (true) {
            try {
                ctx.getClient().UpdateCheckPoint(this.project, this.logstore, this.consumerGroup, shard, cursor);
            }
            catch (LogException ex) {
                if (ctx.waitForNextRetry(ex)) continue;
                if (ERROR_SHARD_NOT_EXIST.equalsIgnoreCase(ex.GetErrorCode())) {
                    LOG.warn("Shard not exist, skip checkpointting", ex);
                    return;
                }
                LOG.error("Error while updating checkpoint", ex);
            }
            break;
        }
    }

    BatchGetLogResponse pullData(int shard, int batchSize, String fromCursor, String endCursor) throws LogException {
        RequestContext ctx = new RequestContext(this.clientProvider);
        while (true) {
            try {
                return ctx.getClient().BatchGetLog(this.project, this.logstore, shard, batchSize, fromCursor, endCursor);
            }
            catch (LogException ex) {
                if (ctx.waitForNextRetry(ex)) continue;
                LOG.error("Error while pulling data from SLS", ex);
                throw ex;
            }
            break;
        }
    }

    public static class RequestContext {
        private final long beginTime = System.currentTimeMillis();
        private int retries = 0;
        private Client client = null;
        private final SLSClientProvider clientProvider;

        public RequestContext(SLSClientProvider clientProvider) {
            this.clientProvider = clientProvider;
        }

        public Client getClient() {
            if (this.client == null) {
                this.client = (Client)this.clientProvider.getClient();
            }
            return this.client;
        }

        public boolean waitForNextRetry(LogException ex) {
            if (!this.shouldRetry(ex)) {
                return false;
            }
            LOG.warn("Retrying for recoverable exception", ex);
            SLSUtils.backoff(this.retries++ * 1000 + 500);
            if (ex.GetHttpCode() == 401) {
                this.client = (Client)this.clientProvider.getClient(true, true);
            }
            return true;
        }

        public void setRetries(int retries) {
            this.retries = retries;
        }

        public boolean shouldRetry(LogException ex) {
            int status = ex.GetHttpCode();
            boolean isNetworkError = status == -1;
            boolean isServerError = status >= 500;
            boolean isThrottled = status == 403;
            boolean canRetry = isNetworkError || isServerError || isThrottled || status != 400 && this.retries < 3;
            return canRetry && System.currentTimeMillis() - this.beginTime <= 600000L;
        }
    }
}

