/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.sls.source;

import com.alibaba.ververica.connectors.common.source.AbstractDynamicParallelSource;
import com.alibaba.ververica.connectors.common.source.reader.RecordReader;
import com.alibaba.ververica.connectors.sls.SLSAccessInfo;
import com.alibaba.ververica.connectors.sls.source.SLSClientProvider;
import com.alibaba.ververica.connectors.sls.source.SLSRecordReader;
import com.alibaba.ververica.connectors.sls.source.SlsInputSplit;
import com.alibaba.ververica.connectors.sls.source.SourceRecord;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.Shard;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.ListShardResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SlsSourceFunction
extends AbstractDynamicParallelSource<SourceRecord, String> {
    private static final long serialVersionUID = 6289824294498842746L;
    private static final Logger LOG = LoggerFactory.getLogger(SlsSourceFunction.class);
    public static final String NEW_SLS_START_FLAG = "new_sls_start_flag";
    private final SLSAccessInfo accessInfo;
    private final String consumerGroup;
    private final int maxRetries;
    private final int startInSec;
    private final int stopInSec;
    private List<Shard> initShardList = new ArrayList<Shard>();
    private transient Client client;
    private transient SLSClientProvider clientProvider;

    public SlsSourceFunction(SLSAccessInfo accessInfo, int startInSec, int stopInSec) {
        this.accessInfo = accessInfo;
        this.consumerGroup = accessInfo.getConsumerGroup();
        this.maxRetries = accessInfo.getMaxRetries();
        this.startInSec = startInSec;
        this.stopInSec = stopInSec;
        this.enableTracingMetrics(1);
    }

    private Client getClient() {
        if (this.client == null) {
            this.client = (Client)this.clientProvider.getClient();
        }
        return this.client;
    }

    private void createConsumerGroup() {
        block3: {
            if (this.consumerGroup == null) {
                return;
            }
            Client client = this.getClient();
            try {
                ConsumerGroup group = new ConsumerGroup(this.consumerGroup, 60, false);
                client.CreateConsumerGroup(this.accessInfo.getProjectName(), this.accessInfo.getLogstore(), group);
            }
            catch (LogException e) {
                if (e.GetErrorCode().compareToIgnoreCase("ConsumerGroupAlreadyExist") == 0) break block3;
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public RecordReader<SourceRecord, String> createReader(Configuration config) throws IOException {
        return new SLSRecordReader(this.accessInfo, this.startInSec, this.stopInSec, this.initShardList, config);
    }

    @Override
    public InputSplit[] createInputSplitsForCurrentSubTask(int numberOfParallelSubTasks, int indexOfThisSubTask) throws IOException {
        List<Shard> subscribedPartitions = this.modAssign(numberOfParallelSubTasks, indexOfThisSubTask);
        InputSplit[] inputSplits = new SlsInputSplit[subscribedPartitions.size()];
        int i = 0;
        for (Shard shard : subscribedPartitions) {
            inputSplits[i++] = new SlsInputSplit(shard.GetShardId());
        }
        return inputSplits;
    }

    @Override
    public List<Tuple2<InputSplit, String>> reAssignInputSplitsForCurrentSubTask(int numberOfParallelSubTasks, int indexOfThisSubTask, List<AbstractDynamicParallelSource.InnerProgress<String>> allSplitsInState) throws IOException {
        ArrayList<Tuple2<InputSplit, String>> initialProgress = new ArrayList<Tuple2<InputSplit, String>>();
        List<Shard> subscribedPartitions = this.modAssign(numberOfParallelSubTasks, indexOfThisSubTask);
        for (Shard shard : subscribedPartitions) {
            boolean existBefore = false;
            for (AbstractDynamicParallelSource.InnerProgress<String> progress : allSplitsInState) {
                if (shard.GetShardId() != progress.getInputSplit().getSplitNumber()) continue;
                initialProgress.add((Tuple2<InputSplit, String>)new Tuple2((Object)progress.getInputSplit(), (Object)progress.getCursor()));
                existBefore = true;
                break;
            }
            if (existBefore) continue;
            initialProgress.add((Tuple2<InputSplit, String>)Tuple2.of((Object)new SlsInputSplit(shard.GetShardId()), (Object)NEW_SLS_START_FLAG));
        }
        return initialProgress;
    }

    @Override
    public List<String> getPartitionList() throws Exception {
        ArrayList<String> partitions = new ArrayList<String>();
        List<Shard> shards = this.fetchAllShards();
        for (Shard shard : shards) {
            partitions.add(String.valueOf(shard.GetShardId()));
        }
        return partitions;
    }

    @Override
    public void open(Configuration config) throws IOException {
        this.clientProvider = new SLSClientProvider(this.accessInfo, config);
        this.createConsumerGroup();
        this.fetchShards();
        super.open(config);
    }

    @Override
    public void close() throws IOException {
        super.close();
        if (this.client != null) {
            this.client.shutdown();
        }
    }

    private List<Shard> fetchAllShards() {
        Client client = this.getClient();
        int i = 0;
        while (true) {
            try {
                LOG.info("Fetching shards for project: {}, logstore: {}", (Object)this.accessInfo.getProjectName(), (Object)this.accessInfo.getLogstore());
                ListShardResponse response = client.ListShard(this.accessInfo.getProjectName(), this.accessInfo.getLogstore());
                return response.GetShards();
            }
            catch (LogException ex) {
                if (i >= this.maxRetries - 1) {
                    throw new RuntimeException(ex);
                }
                LOG.warn("Error listing shards, code = {}, msg={}", ex.GetErrorCode(), ex.GetErrorMessage(), ex);
                ++i;
                continue;
            }
            break;
        }
    }

    private void fetchShards() {
        this.initShardList = this.fetchAllShards();
        this.initShardList.sort(Comparator.comparingInt(Shard::GetShardId));
    }

    private List<Shard> modAssign(int consumerCount, int consumerIndex) {
        LinkedList<Shard> assignedShards = new LinkedList<Shard>();
        for (Shard shard : this.initShardList) {
            if (shard.GetShardId() % consumerCount != consumerIndex) continue;
            assignedShards.add(shard);
        }
        return assignedShards;
    }

    public String toString() {
        return String.format("SLS Source from %s.%s", this.accessInfo.getProjectName(), this.accessInfo.getLogstore());
    }
}

