/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.sls.source;

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.ConsumerGroupCheckPointResponse;
import com.aliyun.openservices.log.response.ListConsumerGroupResponse;
import com.aliyun.openservices.log.response.ListShardResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.sls.config.StartMode;
import org.apache.seatunnel.connectors.seatunnel.sls.source.ConsumerMetaData;
import org.apache.seatunnel.connectors.seatunnel.sls.source.SlsSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.sls.source.SlsSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsSourceState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SlsSourceSplitEnumerator
implements SourceSplitEnumerator<SlsSourceSplit, SlsSourceState> {
    private static final Logger log = LoggerFactory.getLogger(SlsSourceSplitEnumerator.class);
    private final Client slsCleint;
    private final ConsumerMetaData consumerMetaData;
    private final long discoveryIntervalMillis;
    private final SourceSplitEnumerator.Context<SlsSourceSplit> context;
    private final Map<Integer, SlsSourceSplit> pendingSplit;
    private final Map<Integer, SlsSourceSplit> assignedSplit;
    private SlsSourceState slsSourceState;
    private ScheduledExecutorService executor;
    private ScheduledFuture<?> scheduledFuture;

    public SlsSourceSplitEnumerator(SlsSourceConfig slsSourceConfig, SourceSplitEnumerator.Context<SlsSourceSplit> context) {
        this.context = context;
        this.slsCleint = new Client(slsSourceConfig.getEndpoint(), slsSourceConfig.getAccessKeyId(), slsSourceConfig.getAccessKeySecret());
        this.assignedSplit = new HashMap<Integer, SlsSourceSplit>();
        this.pendingSplit = new HashMap<Integer, SlsSourceSplit>();
        this.consumerMetaData = slsSourceConfig.getConsumerMetaData();
        this.discoveryIntervalMillis = slsSourceConfig.getDiscoveryIntervalMillis();
    }

    public SlsSourceSplitEnumerator(SlsSourceConfig slsSourceConfig, SourceSplitEnumerator.Context<SlsSourceSplit> context, SlsSourceState slsSourceState) {
        this.context = context;
        this.slsCleint = new Client(slsSourceConfig.getEndpoint(), slsSourceConfig.getAccessKeyId(), slsSourceConfig.getAccessKeySecret());
        this.assignedSplit = new HashMap<Integer, SlsSourceSplit>();
        this.pendingSplit = new HashMap<Integer, SlsSourceSplit>();
        this.consumerMetaData = slsSourceConfig.getConsumerMetaData();
        this.discoveryIntervalMillis = slsSourceConfig.getDiscoveryIntervalMillis();
        this.slsSourceState = slsSourceState;
        if (slsSourceState != null) {
            // empty if block
        }
    }

    public void open() {
        if (this.discoveryIntervalMillis > 0L) {
            this.executor = Executors.newScheduledThreadPool(1, runnable -> {
                Thread thread = new Thread(runnable);
                thread.setDaemon(true);
                thread.setName("sls-shard-dynamic-discovery");
                return thread;
            });
            this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
                try {
                    this.discoverySplits();
                }
                catch (Exception e) {
                    log.error("Dynamic discovery failure:", (Throwable)e);
                }
            }, this.discoveryIntervalMillis, this.discoveryIntervalMillis, TimeUnit.MILLISECONDS);
        }
    }

    public void run() throws Exception {
        this.fetchPendingShardSplit();
        this.assignSplit();
    }

    public void close() throws IOException {
    }

    public void addSplitsBack(List<SlsSourceSplit> splits, int subtaskId) {
        if (!splits.isEmpty()) {
            splits.forEach(split -> this.pendingSplit.put(split.getShardId(), (SlsSourceSplit)split));
        }
    }

    public int currentUnassignedSplitSize() {
        return 0;
    }

    public void handleSplitRequest(int subtaskId) {
    }

    public void registerReader(int subtaskId) {
        if (!this.pendingSplit.isEmpty()) {
            this.assignSplit();
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }

    private void discoverySplits() throws LogException {
        this.fetchPendingShardSplit();
        this.assignSplit();
    }

    private void fetchPendingShardSplit() throws LogException {
        String project = this.consumerMetaData.getProject();
        String logStore = this.consumerMetaData.getLogstore();
        String consumer = this.consumerMetaData.getConsumerGroup();
        StartMode startMode = this.consumerMetaData.getStartMode();
        int fetachSize = this.consumerMetaData.getFetchSize();
        Consts.CursorMode autoCursorReset = this.consumerMetaData.getAutoCursorReset();
        ListShardResponse shards = this.slsCleint.ListShard(project, logStore);
        shards.GetShards().forEach(shard -> {
            if (!this.assignedSplit.containsKey(shard.getShardId()) && !this.pendingSplit.containsKey(shard.getShardId())) {
                String cursor = "";
                try {
                    cursor = this.initShardCursor(project, logStore, consumer, shard.getShardId(), startMode, autoCursorReset);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
                if (cursor.equals("")) {
                    throw new RuntimeException("shard cursor error");
                }
                SlsSourceSplit split = new SlsSourceSplit(project, logStore, consumer, shard.getShardId(), cursor, fetachSize);
                this.pendingSplit.put(shard.getShardId(), split);
            }
        });
    }

    private String initShardCursor(String project, String logStore, String consumer, int shardIdKey, StartMode cursorMode, Consts.CursorMode autoCursorReset) throws Exception {
        switch (cursorMode) {
            case EARLIEST: {
                try {
                    return this.slsCleint.GetCursor(project, logStore, shardIdKey, Consts.CursorMode.BEGIN).GetCursor();
                }
                catch (LogException e) {
                    throw new RuntimeException(e);
                }
            }
            case LATEST: {
                try {
                    return this.slsCleint.GetCursor(project, logStore, shardIdKey, Consts.CursorMode.END).GetCursor();
                }
                catch (LogException e) {
                    throw new RuntimeException(e);
                }
            }
            case GROUP_CURSOR: {
                try {
                    ConsumerGroupShardCheckPoint checkpoint;
                    ConsumerGroupCheckPointResponse response;
                    List<ConsumerGroupShardCheckPoint> checkpoints;
                    boolean groupExists = this.checkConsumerGroupExists(project, logStore, consumer);
                    if (!groupExists) {
                        this.createConsumerGroup(project, logStore, consumer);
                    }
                    if ((checkpoints = (response = this.slsCleint.GetCheckPoint(project, logStore, consumer, shardIdKey)).getCheckPoints()).size() == 1 && !(checkpoint = checkpoints.get(0)).getCheckPoint().equals("")) {
                        return checkpoint.getCheckPoint();
                    }
                    return this.slsCleint.GetCursor(project, logStore, shardIdKey, autoCursorReset).GetCursor();
                }
                catch (LogException e) {
                    if (e.GetErrorCode().equals("ConsumerGroupNotExist")) {
                        return this.slsCleint.GetCursor(project, logStore, shardIdKey, autoCursorReset).GetCursor();
                    }
                    throw new RuntimeException(e);
                }
            }
        }
        throw new RuntimeException(project + ":" + logStore + ":" + consumer + ":" + (Object)((Object)cursorMode) + ":fail");
    }

    private synchronized void assignSplit() {
        HashMap<Integer, List> readySplit = new HashMap<Integer, List>(16);
        for (int taskID = 0; taskID < this.context.currentParallelism(); ++taskID) {
            readySplit.computeIfAbsent(taskID, id -> new ArrayList());
        }
        this.pendingSplit.forEach((key, value) -> {
            if (!this.assignedSplit.containsKey(key)) {
                ((List)readySplit.get(SlsSourceSplitEnumerator.getSplitOwner(value.getShardId(), this.context.currentParallelism()))).add(value);
            }
        });
        readySplit.forEach((id, split) -> {
            this.context.assignSplit(id.intValue(), split);
            if (this.discoveryIntervalMillis <= 0L) {
                this.context.signalNoMoreSplits(id.intValue());
            }
        });
        this.assignedSplit.putAll(this.pendingSplit);
        this.pendingSplit.clear();
    }

    private static int getSplitOwner(int shardId, int numReaders) {
        return shardId % numReaders;
    }

    public SlsSourceState snapshotState(long checkpointId) throws Exception {
        return new SlsSourceState(new HashSet<SlsSourceSplit>(this.assignedSplit.values()));
    }

    public boolean checkConsumerGroupExists(String project, String logstore, String consumerGroup) throws Exception {
        ListConsumerGroupResponse response = this.slsCleint.ListConsumerGroup(project, logstore);
        if (response != null) {
            for (ConsumerGroup item : response.GetConsumerGroups()) {
                if (!item.getConsumerGroupName().equals(consumerGroup)) continue;
                return true;
            }
        }
        return false;
    }

    public void createConsumerGroup(String project, String logstore, String consumerGroupName) throws LogException {
        ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 100, false);
        try {
            this.slsCleint.CreateConsumerGroup(project, logstore, consumerGroup);
        }
        catch (LogException ex) {
            if ("ConsumerGroupAlreadyExist".equals(ex.GetErrorCode())) {
                // empty if block
            }
            throw ex;
        }
    }
}

