/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
import org.apache.camel.Suspendable;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaConsumerHealthCheck;
import org.apache.camel.component.kafka.KafkaEndpoint;
import org.apache.camel.component.kafka.KafkaFetchRecords;
import org.apache.camel.component.kafka.TaskHealthState;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
import org.apache.camel.health.HealthCheck;
import org.apache.camel.health.HealthCheckAware;
import org.apache.camel.health.HealthCheckHelper;
import org.apache.camel.health.WritableHealthCheckRepository;
import org.apache.camel.resume.ConsumerListenerAware;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.spi.StateRepository;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumer
extends DefaultConsumer
implements ResumeAware<ResumeStrategy>,
HealthCheckAware,
ConsumerListenerAware<KafkaConsumerListener>,
Suspendable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
    protected ExecutorService executor;
    private final KafkaEndpoint endpoint;
    private KafkaConsumerHealthCheck consumerHealthCheck;
    private WritableHealthCheckRepository healthCheckRepository;
    private final List<KafkaFetchRecords> tasks = new ArrayList<KafkaFetchRecords>();
    private volatile boolean stopOffsetRepo;
    private ResumeStrategy resumeStrategy;
    private KafkaConsumerListener consumerListener;

    public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
        this.endpoint = endpoint;
    }

    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
        this.resumeStrategy = resumeStrategy;
    }

    public ResumeStrategy getResumeStrategy() {
        return this.resumeStrategy;
    }

    public KafkaConsumerListener getConsumerListener() {
        return this.consumerListener;
    }

    public void setConsumerListener(KafkaConsumerListener consumerListener) {
        this.consumerListener = consumerListener;
    }

    protected void doBuild() throws Exception {
        super.doBuild();
    }

    public KafkaEndpoint getEndpoint() {
        return (KafkaEndpoint)super.getEndpoint();
    }

    private String randomUUID() {
        return UUID.randomUUID().toString();
    }

    Properties getProps() {
        KafkaConfiguration configuration = this.endpoint.getConfiguration();
        Properties props = configuration.createConsumerProperties();
        this.endpoint.updateClassProperties(props);
        ObjectHelper.ifNotEmpty((Object)this.endpoint.getKafkaClientFactory().getBrokers(configuration), v -> props.put("bootstrap.servers", v));
        String groupId = (String)ObjectHelper.supplyIfEmpty((Object)configuration.getGroupId(), this::randomUUID);
        props.put("group.id", groupId);
        ObjectHelper.ifNotEmpty((Object)configuration.getGroupInstanceId(), v -> props.put("group.instance.id", v));
        return props;
    }

    protected void doStart() throws Exception {
        boolean started;
        StateRepository<String, String> repo;
        LOG.info("Starting Kafka consumer on topic: {} with breakOnFirstError: {}", (Object)this.endpoint.getConfiguration().getTopic(), (Object)this.endpoint.getConfiguration().isBreakOnFirstError());
        super.doStart();
        this.healthCheckRepository = (WritableHealthCheckRepository)HealthCheckHelper.getHealthCheckRepository((CamelContext)this.endpoint.getCamelContext(), (String)"components", WritableHealthCheckRepository.class);
        if (this.healthCheckRepository != null) {
            this.consumerHealthCheck = new KafkaConsumerHealthCheck(this, this.getRouteId());
            this.healthCheckRepository.addHealthCheck((HealthCheck)this.consumerHealthCheck);
        }
        if ((repo = this.endpoint.getConfiguration().getOffsetRepository()) instanceof ServiceSupport && !(started = ((ServiceSupport)repo).isStarted())) {
            this.stopOffsetRepo = true;
            LOG.debug("Starting OffsetRepository: {}", repo);
            ServiceHelper.startService(this.endpoint.getConfiguration().getOffsetRepository());
        }
        this.executor = this.endpoint.createExecutor();
        String topic = this.endpoint.getConfiguration().getTopic();
        Pattern pattern = null;
        if (this.endpoint.getConfiguration().isTopicIsPattern()) {
            pattern = Pattern.compile(topic);
        }
        BridgeExceptionHandlerToErrorHandler bridge = new BridgeExceptionHandlerToErrorHandler((DefaultConsumer)this);
        for (int i = 0; i < this.endpoint.getConfiguration().getConsumersCount(); ++i) {
            KafkaFetchRecords task = new KafkaFetchRecords(this, bridge, topic, pattern, Integer.toString(i), this.getProps(), this.consumerListener);
            this.executor.submit(task);
            this.tasks.add(task);
        }
    }

    protected void doStop() throws Exception {
        if (this.endpoint.getConfiguration().isTopicIsPattern()) {
            LOG.info("Stopping Kafka consumer on topic pattern: {}", (Object)this.endpoint.getConfiguration().getTopic());
        } else {
            LOG.info("Stopping Kafka consumer on topic: {}", (Object)this.endpoint.getConfiguration().getTopic());
        }
        if (this.healthCheckRepository != null && this.consumerHealthCheck != null) {
            this.healthCheckRepository.removeHealthCheck((HealthCheck)this.consumerHealthCheck);
            this.consumerHealthCheck = null;
        }
        if (this.executor != null) {
            if (this.getEndpoint() != null && this.getEndpoint().getCamelContext() != null) {
                for (KafkaFetchRecords task : this.tasks) {
                    task.stop();
                }
                timeout = this.getEndpoint().getConfiguration().getShutdownTimeout();
                LOG.debug("Shutting down Kafka consumer worker threads with timeout {} millis", (Object)timeout);
                this.getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.executor, (long)timeout);
            } else {
                this.executor.shutdown();
                timeout = this.endpoint.getConfiguration().getShutdownTimeout();
                LOG.debug("Shutting down Kafka consumer worker threads with timeout {} millis", (Object)timeout);
                if (!this.executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
                    LOG.warn("Shutting down Kafka {} consumer worker threads did not finish within {} millis", (Object)this.tasks.size(), (Object)timeout);
                }
            }
            if (!this.executor.isTerminated()) {
                this.tasks.forEach(KafkaFetchRecords::stop);
                this.executor.shutdownNow();
            }
        }
        this.tasks.clear();
        this.executor = null;
        if (this.stopOffsetRepo) {
            StateRepository<String, String> repo = this.endpoint.getConfiguration().getOffsetRepository();
            LOG.debug("Stopping OffsetRepository: {}", repo);
            ServiceHelper.stopAndShutdownService(repo);
        }
        super.doStop();
    }

    protected void doSuspend() throws Exception {
        for (KafkaFetchRecords task : this.tasks) {
            LOG.info("Pausing Kafka record fetcher task running client ID {}", (Object)task.healthState().getClientId());
            task.pause();
        }
        super.doSuspend();
    }

    protected void doResume() throws Exception {
        for (KafkaFetchRecords task : this.tasks) {
            LOG.info("Resuming Kafka record fetcher task running client ID {}", (Object)task.healthState().getClientId());
            task.resume();
        }
        super.doResume();
    }

    public List<TaskHealthState> healthStates() {
        return this.tasks.stream().map(t -> t.healthState()).collect(Collectors.toList());
    }

    public String adapterFactoryService() {
        return "kafka-adapter-factory";
    }
}

