/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.loghub.client;

import com.aliyun.openservices.loghub.client.LogHubClientAdapter;
import com.aliyun.openservices.loghub.client.LogHubHeartBeat;
import com.aliyun.openservices.loghub.client.LogThreadFactory;
import com.aliyun.openservices.loghub.client.LoghubClientUtil;
import com.aliyun.openservices.loghub.client.ShardConsumer;
import com.aliyun.openservices.loghub.client.ShardFilter;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
import com.aliyun.openservices.loghub.client.throttle.FixedResourceBarrier;
import com.aliyun.openservices.loghub.client.throttle.ResourceBarrier;
import com.aliyun.openservices.loghub.client.throttle.UnlimitedResourceBarrier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientWorker
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ClientWorker.class);
    private final ILogHubProcessorFactory processorFactory;
    private final LogHubConfig logHubConfig;
    private final LogHubHeartBeat logHubHeartBeat;
    private final Map<Integer, ShardConsumer> shardConsumer = new HashMap<Integer, ShardConsumer>();
    private final ExecutorService executorService;
    private final LogHubClientAdapter loghubClient;
    private volatile boolean shutDown = false;
    private volatile boolean mainLoopExit = false;
    private ResourceBarrier resourceBarrier;
    private int lastFetchThrottleMinShard = 0;
    private boolean shareThreadPool;

    public ClientWorker(ILogHubProcessorFactory factory, LogHubConfig config) throws LogHubClientWorkerException {
        this(factory, config, null);
    }

    public ClientWorker(ILogHubProcessorFactory factory, LogHubConfig config, ExecutorService threadPool) throws LogHubClientWorkerException {
        this.processorFactory = factory;
        this.logHubConfig = config;
        if (threadPool == null) {
            this.shareThreadPool = false;
            this.executorService = Executors.newCachedThreadPool(new LogThreadFactory());
        } else {
            this.shareThreadPool = true;
            this.executorService = threadPool;
        }
        this.loghubClient = new LogHubClientAdapter(config);
        try {
            this.loghubClient.createConsumerGroupIfNotExist(config);
        }
        catch (LogHubClientWorkerException ex) {
            this.loghubClient.shutdown();
            throw ex;
        }
        this.logHubHeartBeat = new LogHubHeartBeat(this.loghubClient, config);
        int dataSizeInMB = this.logHubConfig.getMaxInProgressingDataSizeInMB();
        this.resourceBarrier = dataSizeInMB > 0 ? new FixedResourceBarrier((long)dataSizeInMB * 1024L * 1024L) : new UnlimitedResourceBarrier();
    }

    public void SwitchClient(String accessKeyId, String accessKey) {
        this.loghubClient.SwitchClient(this.logHubConfig.getEndpoint(), accessKeyId, accessKey, null);
    }

    public void SwitchClient(String accessKeyId, String accessKey, String stsToken) {
        this.loghubClient.SwitchClient(this.logHubConfig.getEndpoint(), accessKeyId, accessKey, stsToken);
    }

    public void setShardFilter(ShardFilter shardFilter) {
        this.logHubHeartBeat.setShardFilter(shardFilter);
    }

    private static List<Integer> sortShards(List<Integer> shards) {
        if (shards == null) {
            return Collections.emptyList();
        }
        Collections.sort(shards);
        return shards;
    }

    @Override
    public void run() {
        this.logHubHeartBeat.start();
        long fetchInterval = this.logHubConfig.getFetchIntervalMillis();
        while (!this.shutDown) {
            List<Integer> heldShards = this.logHubHeartBeat.getHeldShards();
            List<Integer> shards = ClientWorker.sortShards(heldShards);
            int curFetchThrottleMinShard = -1;
            for (int shard : shards) {
                ShardConsumer consumer = this.consumerForShard(shard);
                if (consumer.consume(shard >= this.lastFetchThrottleMinShard) || curFetchThrottleMinShard >= 0) continue;
                curFetchThrottleMinShard = shard;
            }
            this.lastFetchThrottleMinShard = Math.max(curFetchThrottleMinShard, 0);
            this.cleanConsumer(heldShards);
            LoghubClientUtil.sleep(fetchInterval);
        }
        this.mainLoopExit = true;
    }

    public void shutdown() {
        this.shutDown = true;
        int times = 0;
        while (!this.mainLoopExit && times++ < 20) {
            LoghubClientUtil.sleep(1000L);
        }
        for (ShardConsumer consumer : this.shardConsumer.values()) {
            consumer.shutdown();
        }
        if (!this.shareThreadPool) {
            LoghubClientUtil.shutdownThreadPool(this.executorService, 30L);
        }
        this.logHubHeartBeat.stop();
        this.loghubClient.shutdown();
    }

    private void cleanConsumer(List<Integer> ownedShard) {
        ArrayList<Integer> shardToUnload = new ArrayList<Integer>();
        for (Map.Entry<Integer, ShardConsumer> entry : this.shardConsumer.entrySet()) {
            int shard = entry.getKey();
            if (ownedShard.contains(shard)) continue;
            LOG.warn("Shard {} has been assigned to another consumer.", (Object)shard);
            ShardConsumer consumer = entry.getValue();
            if (consumer.canBeUnloaded()) {
                LOG.info("Shutting down consumer of shard: {}", (Object)shard);
                consumer.shutdown();
            } else {
                LOG.warn("Shard {} cannot be unloaded as it's checkpoint has not been committed yet", (Object)shard);
            }
            if (!consumer.isShutdown()) continue;
            shardToUnload.add(shard);
            LOG.info("Shard shutdown done, removing from heartbeat list: {}", (Object)shard);
        }
        for (Integer shard : shardToUnload) {
            this.shardConsumer.remove(shard);
        }
        for (Integer shard : ownedShard) {
            if (this.shardConsumer.containsKey(shard)) continue;
            shardToUnload.add(shard);
        }
        if (!shardToUnload.isEmpty()) {
            this.logHubHeartBeat.unsubscribe(shardToUnload);
            LOG.warn("Cancel heart beating for {}", (Object)Arrays.toString(shardToUnload.toArray()));
        }
    }

    private ShardConsumer consumerForShard(int shardId) {
        ShardConsumer consumer = this.shardConsumer.get(shardId);
        if (consumer != null) {
            return consumer;
        }
        consumer = new ShardConsumer(this.loghubClient, shardId, this.processorFactory.generatorProcessor(), this.executorService, this.logHubConfig, this.logHubHeartBeat, this.resourceBarrier);
        this.shardConsumer.put(shardId, consumer);
        LOG.info("Create a consumer for shard: {}", (Object)shardId);
        return consumer;
    }
}

