/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerSinkTask;
import org.apache.kafka.connect.runtime.WorkerSourceTask;
import org.apache.kafka.connect.runtime.WorkerTask;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Worker {
    private static final Logger log = LoggerFactory.getLogger(Worker.class);
    private Time time;
    private WorkerConfig config;
    private Converter keyConverter;
    private Converter valueConverter;
    private Converter internalKeyConverter;
    private Converter internalValueConverter;
    private OffsetBackingStore offsetBackingStore;
    private HashMap<String, Connector> connectors = new HashMap();
    private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap();
    private KafkaProducer<byte[], byte[]> producer;
    private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;

    public Worker(WorkerConfig config, OffsetBackingStore offsetBackingStore) {
        this((Time)new SystemTime(), config, offsetBackingStore);
    }

    public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
        this.time = time;
        this.config = config;
        this.keyConverter = (Converter)config.getConfiguredInstance("key.converter", Converter.class);
        this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true);
        this.valueConverter = (Converter)config.getConfiguredInstance("value.converter", Converter.class);
        this.valueConverter.configure(config.originalsWithPrefix("value.converter."), false);
        this.internalKeyConverter = (Converter)config.getConfiguredInstance("internal.key.converter", Converter.class);
        this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true);
        this.internalValueConverter = (Converter)config.getConfiguredInstance("internal.value.converter", Converter.class);
        this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."), false);
        this.offsetBackingStore = offsetBackingStore;
        this.offsetBackingStore.configure(config.originals());
    }

    public void start() {
        log.info("Worker starting");
        HashMap<String, String> producerProps = new HashMap<String, String>();
        producerProps.put("bootstrap.servers", Utils.join((Collection)this.config.getList("bootstrap.servers"), (String)","));
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProps.put("request.timeout.ms", Integer.valueOf(Integer.MAX_VALUE).toString());
        producerProps.put("retries", Integer.valueOf(Integer.MAX_VALUE).toString());
        producerProps.put("max.block.ms", Long.valueOf(Long.MAX_VALUE).toString());
        producerProps.put("acks", "all");
        producerProps.put("max.in.flight.requests.per.connection", "1");
        producerProps.putAll(this.config.originalsWithPrefix("producer."));
        this.producer = new KafkaProducer(producerProps);
        this.offsetBackingStore.start();
        this.sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(this.time, this.config);
        log.info("Worker started");
    }

    public void stop() {
        WorkerTask task;
        log.info("Worker stopping");
        long started = this.time.milliseconds();
        long limit = started + this.config.getLong("task.shutdown.graceful.timeout.ms");
        for (Map.Entry<String, Connector> entry : this.connectors.entrySet()) {
            Connector conn = entry.getValue();
            log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before theWorker is stopped.", (Object)conn);
            try {
                conn.stop();
            }
            catch (ConnectException e) {
                log.error("Error while shutting down connector " + conn, (Throwable)e);
            }
        }
        for (Map.Entry<Object, Object> entry : this.tasks.entrySet()) {
            task = (WorkerTask)entry.getValue();
            log.warn("Shutting down task {} uncleanly; herder should have shut down tasks before the Worker is stopped.", (Object)task);
            try {
                task.stop();
            }
            catch (ConnectException e) {
                log.error("Error while shutting down task " + task, (Throwable)e);
            }
        }
        for (Map.Entry<Object, Object> entry : this.tasks.entrySet()) {
            task = (WorkerTask)entry.getValue();
            log.debug("Waiting for task {} to finish shutting down", (Object)task);
            if (!task.awaitStop(Math.max(limit - this.time.milliseconds(), 0L))) {
                log.error("Graceful shutdown of task {} failed.", (Object)task);
            }
            task.close();
        }
        long timeoutMs = limit - this.time.milliseconds();
        this.sourceTaskOffsetCommitter.close(timeoutMs);
        this.offsetBackingStore.stop();
        log.info("Worker stopped");
    }

    public WorkerConfig config() {
        return this.config;
    }

    public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) {
        Class<Connector> connClass;
        String connName = connConfig.getString("name");
        Class maybeConnClass = connConfig.getClass("connector.class");
        log.info("Creating connector {} of type {}", (Object)connName, (Object)maybeConnClass.getName());
        try {
            connClass = maybeConnClass.asSubclass(Connector.class);
        }
        catch (ClassCastException e) {
            throw new ConnectException("Specified class is not a subclass of Connector: " + maybeConnClass.getName());
        }
        if (this.connectors.containsKey(connName)) {
            throw new ConnectException("Connector with name " + connName + " already exists");
        }
        Connector connector = Worker.instantiateConnector(connClass);
        log.info("Instantiated connector {} with version {} of type {}", new Object[]{connName, connector.version(), connClass.getName()});
        connector.initialize(ctx);
        try {
            connector.start(connConfig.originalsStrings());
        }
        catch (ConnectException e) {
            throw new ConnectException("Connector threw an exception while starting", (Throwable)e);
        }
        this.connectors.put(connName, connector);
        log.info("Finished creating connector {}", (Object)connName);
    }

    private static Connector instantiateConnector(Class<? extends Connector> connClass) {
        try {
            return (Connector)Utils.newInstance(connClass);
        }
        catch (Throwable t) {
            throw new ConnectException("Failed to create connector instance", t);
        }
    }

    public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) {
        log.trace("Reconfiguring connector tasks for {}", (Object)connName);
        Connector connector = this.connectors.get(connName);
        if (connector == null) {
            throw new ConnectException("Connector " + connName + " not found in this worker.");
        }
        ArrayList<Map<String, String>> result = new ArrayList<Map<String, String>>();
        String taskClassName = connector.taskClass().getName();
        for (Map taskProps : connector.taskConfigs(maxTasks)) {
            HashMap<String, String> taskConfig = new HashMap<String, String>(taskProps);
            taskConfig.put("task.class", taskClassName);
            if (sinkTopics != null) {
                taskConfig.put("topics", Utils.join(sinkTopics, (String)","));
            }
            result.add(taskConfig);
        }
        return result;
    }

    public void stopConnector(String connName) {
        log.info("Stopping connector {}", (Object)connName);
        Connector connector = this.connectors.get(connName);
        if (connector == null) {
            throw new ConnectException("Connector " + connName + " not found in this worker.");
        }
        try {
            connector.stop();
        }
        catch (ConnectException e) {
            log.error("Error shutting down connector {}: ", (Object)connector, (Object)e);
        }
        this.connectors.remove(connName);
        log.info("Stopped connector {}", (Object)connName);
    }

    public Set<String> connectorNames() {
        return this.connectors.keySet();
    }

    public void addTask(ConnectorTaskId id, TaskConfig taskConfig) {
        WorkerTask workerTask;
        log.info("Creating task {}", (Object)id);
        if (this.tasks.containsKey(id)) {
            String msg = "Task already exists in this worker; the herder should not have requested that this : " + id;
            log.error(msg);
            throw new ConnectException(msg);
        }
        Class<Task> taskClass = taskConfig.getClass("task.class").asSubclass(Task.class);
        Task task = Worker.instantiateTask(taskClass);
        log.info("Instantiated task {} with version {} of type {}", new Object[]{id, task.version(), taskClass.getName()});
        if (task instanceof SourceTask) {
            SourceTask sourceTask = (SourceTask)task;
            OffsetStorageReaderImpl offsetReader = new OffsetStorageReaderImpl(this.offsetBackingStore, id.connector(), this.internalKeyConverter, this.internalValueConverter);
            OffsetStorageWriter offsetWriter = new OffsetStorageWriter(this.offsetBackingStore, id.connector(), this.internalKeyConverter, this.internalValueConverter);
            workerTask = new WorkerSourceTask(id, sourceTask, this.keyConverter, this.valueConverter, this.producer, offsetReader, offsetWriter, this.config, this.time);
        } else if (task instanceof SinkTask) {
            workerTask = new WorkerSinkTask(id, (SinkTask)task, this.config, this.keyConverter, this.valueConverter, this.time);
        } else {
            log.error("Tasks must be a subclass of either SourceTask or SinkTask", (Object)task);
            throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
        }
        workerTask.start(taskConfig.originalsStrings());
        if (task instanceof SourceTask) {
            WorkerSourceTask workerSourceTask = (WorkerSourceTask)workerTask;
            this.sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
        }
        this.tasks.put(id, workerTask);
    }

    private static Task instantiateTask(Class<? extends Task> taskClass) {
        try {
            return (Task)Utils.newInstance(taskClass);
        }
        catch (KafkaException e) {
            throw new ConnectException("Task class not found", (Throwable)e);
        }
    }

    public void stopTask(ConnectorTaskId id) {
        log.info("Stopping task {}", (Object)id);
        WorkerTask task = this.getTask(id);
        if (task instanceof WorkerSourceTask) {
            this.sourceTaskOffsetCommitter.remove(id);
        }
        task.stop();
        if (!task.awaitStop(this.config.getLong("task.shutdown.graceful.timeout.ms"))) {
            log.error("Graceful stop of task {} failed.", (Object)task);
        }
        task.close();
        this.tasks.remove(id);
    }

    public Set<ConnectorTaskId> taskIds() {
        return this.tasks.keySet();
    }

    private WorkerTask getTask(ConnectorTaskId id) {
        WorkerTask task = this.tasks.get(id);
        if (task == null) {
            log.error("Task not found: " + id);
            throw new ConnectException("Task not found: " + id);
        }
        return task;
    }

    public Converter getInternalKeyConverter() {
        return this.internalKeyConverter;
    }

    public Converter getInternalValueConverter() {
        return this.internalValueConverter;
    }
}

