/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConnector;
import org.apache.kafka.connect.runtime.WorkerSinkTask;
import org.apache.kafka.connect.runtime.WorkerSourceTask;
import org.apache.kafka.connect.runtime.WorkerTask;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.reflections.Configuration;
import org.reflections.Reflections;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Worker {
    private static final Logger log = LoggerFactory.getLogger(Worker.class);
    private final ExecutorService executor;
    private final Time time;
    private final String workerId;
    private final WorkerConfig config;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final Converter internalKeyConverter;
    private final Converter internalValueConverter;
    private final OffsetBackingStore offsetBackingStore;
    private HashMap<String, WorkerConnector> connectors = new HashMap();
    private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap();
    private KafkaProducer<byte[], byte[]> producer;
    private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;

    public Worker(String workerId, Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
        this.executor = Executors.newCachedThreadPool();
        this.workerId = workerId;
        this.time = time;
        this.config = config;
        this.keyConverter = (Converter)config.getConfiguredInstance("key.converter", Converter.class);
        this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true);
        this.valueConverter = (Converter)config.getConfiguredInstance("value.converter", Converter.class);
        this.valueConverter.configure(config.originalsWithPrefix("value.converter."), false);
        this.internalKeyConverter = (Converter)config.getConfiguredInstance("internal.key.converter", Converter.class);
        this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true);
        this.internalValueConverter = (Converter)config.getConfiguredInstance("internal.value.converter", Converter.class);
        this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."), false);
        this.offsetBackingStore = offsetBackingStore;
        this.offsetBackingStore.configure(config);
    }

    public void start() {
        log.info("Worker starting");
        HashMap<String, String> producerProps = new HashMap<String, String>();
        producerProps.put("bootstrap.servers", Utils.join((Collection)this.config.getList("bootstrap.servers"), (String)","));
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProps.put("request.timeout.ms", Integer.valueOf(Integer.MAX_VALUE).toString());
        producerProps.put("retries", Integer.valueOf(Integer.MAX_VALUE).toString());
        producerProps.put("max.block.ms", Long.valueOf(Long.MAX_VALUE).toString());
        producerProps.put("acks", "all");
        producerProps.put("max.in.flight.requests.per.connection", "1");
        producerProps.putAll(this.config.originalsWithPrefix("producer."));
        this.producer = new KafkaProducer(producerProps);
        this.offsetBackingStore.start();
        this.sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(this.config);
        log.info("Worker started");
    }

    public void stop() {
        log.info("Worker stopping");
        long started = this.time.milliseconds();
        long limit = started + this.config.getLong("task.shutdown.graceful.timeout.ms");
        for (Map.Entry<String, WorkerConnector> entry : this.connectors.entrySet()) {
            WorkerConnector workerConnector = entry.getValue();
            log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before theWorker is stopped.", (Object)entry.getKey());
            workerConnector.shutdown();
        }
        Set<ConnectorTaskId> taskIds = this.tasks.keySet();
        log.warn("Shutting down tasks {} uncleanly; herder should have shut down tasks before the Worker is stopped.", taskIds);
        this.stopTasks(taskIds);
        this.awaitStopTasks(taskIds);
        long timeoutMs = limit - this.time.milliseconds();
        this.sourceTaskOffsetCommitter.close(timeoutMs);
        this.offsetBackingStore.stop();
        log.info("Worker stopped");
    }

    public void startConnector(ConnectorConfig connConfig, ConnectorContext ctx, ConnectorStatus.Listener statusListener, TargetState initialState) {
        String connName = connConfig.getString("name");
        Class<? extends Connector> connClass = this.getConnectorClass(connConfig.getString("connector.class"));
        log.info("Creating connector {} of type {}", (Object)connName, (Object)connClass.getName());
        if (this.connectors.containsKey(connName)) {
            throw new ConnectException("Connector with name " + connName + " already exists");
        }
        Connector connector = Worker.instantiateConnector(connClass);
        WorkerConnector workerConnector = new WorkerConnector(connName, connector, ctx, statusListener);
        log.info("Instantiated connector {} with version {} of type {}", new Object[]{connName, connector.version(), connClass.getName()});
        workerConnector.initialize(connConfig);
        workerConnector.transitionTo(initialState);
        this.connectors.put(connName, workerConnector);
        log.info("Finished creating connector {}", (Object)connName);
    }

    public boolean isSinkConnector(String connName) {
        WorkerConnector workerConnector = this.connectors.get(connName);
        return workerConnector.isSinkConnector();
    }

    public Connector getConnector(String connType) {
        Class<? extends Connector> connectorClass = this.getConnectorClass(connType);
        return Worker.instantiateConnector(connectorClass);
    }

    private Class<? extends Connector> getConnectorClass(String connectorAlias) {
        try {
            Class<?> clazz = Class.forName(connectorAlias);
            if (!Connector.class.isAssignableFrom(clazz)) {
                throw new ConnectException("Class " + connectorAlias + " does not implement Connector");
            }
            return clazz;
        }
        catch (ClassNotFoundException e) {
            Reflections reflections = new Reflections((Configuration)new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath()));
            Set connectors = reflections.getSubTypesOf(Connector.class);
            ArrayList<Class<? extends Connector>> results = new ArrayList<Class<? extends Connector>>();
            for (Class connector : connectors) {
                if (connector.getSimpleName().equals(connectorAlias)) {
                    results.add(connector);
                }
                if (!connector.getSimpleName().equals(connectorAlias + "Connector")) continue;
                results.add(connector);
            }
            if (results.isEmpty()) {
                throw new ConnectException("Failed to find any class that implements Connector and which name matches " + connectorAlias + " available connectors are: " + this.connectorNames(connectors));
            }
            if (results.size() > 1) {
                throw new ConnectException("More than one connector matches alias " + connectorAlias + ". Please use full package + class name instead. Classes found: " + this.connectorNames(results));
            }
            return (Class)results.get(0);
        }
    }

    private String connectorNames(Collection<Class<? extends Connector>> connectors) {
        StringBuilder names = new StringBuilder();
        for (Class<? extends Connector> c : connectors) {
            names.append(c.getName()).append(", ");
        }
        return names.substring(0, names.toString().length() - 2);
    }

    public boolean ownsTask(ConnectorTaskId taskId) {
        return this.tasks.containsKey(taskId);
    }

    private static Connector instantiateConnector(Class<? extends Connector> connClass) {
        try {
            return (Connector)Utils.newInstance(connClass);
        }
        catch (Throwable t) {
            throw new ConnectException("Failed to create connector instance", t);
        }
    }

    public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) {
        log.trace("Reconfiguring connector tasks for {}", (Object)connName);
        WorkerConnector workerConnector = this.connectors.get(connName);
        if (workerConnector == null) {
            throw new ConnectException("Connector " + connName + " not found in this worker.");
        }
        Connector connector = workerConnector.connector();
        ArrayList<Map<String, String>> result = new ArrayList<Map<String, String>>();
        String taskClassName = connector.taskClass().getName();
        for (Map taskProps : connector.taskConfigs(maxTasks)) {
            HashMap<String, String> taskConfig = new HashMap<String, String>(taskProps);
            taskConfig.put("task.class", taskClassName);
            if (sinkTopics != null) {
                taskConfig.put("topics", Utils.join(sinkTopics, (String)","));
            }
            result.add(taskConfig);
        }
        return result;
    }

    public void stopConnector(String connName) {
        log.info("Stopping connector {}", (Object)connName);
        WorkerConnector connector = this.connectors.get(connName);
        if (connector == null) {
            throw new ConnectException("Connector " + connName + " not found in this worker.");
        }
        connector.shutdown();
        this.connectors.remove(connName);
        log.info("Stopped connector {}", (Object)connName);
    }

    public Set<String> connectorNames() {
        return this.connectors.keySet();
    }

    public boolean isRunning(String connName) {
        WorkerConnector connector = this.connectors.get(connName);
        if (connector == null) {
            throw new ConnectException("Connector " + connName + " not found in this worker.");
        }
        return connector.isRunning();
    }

    public void startTask(ConnectorTaskId id, TaskConfig taskConfig, TaskStatus.Listener statusListener, TargetState initialState) {
        log.info("Creating task {}", (Object)id);
        if (this.tasks.containsKey(id)) {
            String msg = "Task already exists in this worker; the herder should not have requested that this : " + id;
            log.error(msg);
            throw new ConnectException(msg);
        }
        Class<Task> taskClass = taskConfig.getClass("task.class").asSubclass(Task.class);
        Task task = Worker.instantiateTask(taskClass);
        log.info("Instantiated task {} with version {} of type {}", new Object[]{id, task.version(), taskClass.getName()});
        WorkerTask workerTask = this.buildWorkerTask(id, task, statusListener, initialState);
        workerTask.initialize(taskConfig);
        this.executor.submit(workerTask);
        if (task instanceof SourceTask) {
            WorkerSourceTask workerSourceTask = (WorkerSourceTask)workerTask;
            this.sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
        }
        this.tasks.put(id, workerTask);
    }

    private WorkerTask buildWorkerTask(ConnectorTaskId id, Task task, TaskStatus.Listener statusListener, TargetState initialState) {
        if (task instanceof SourceTask) {
            OffsetStorageReaderImpl offsetReader = new OffsetStorageReaderImpl(this.offsetBackingStore, id.connector(), this.internalKeyConverter, this.internalValueConverter);
            OffsetStorageWriter offsetWriter = new OffsetStorageWriter(this.offsetBackingStore, id.connector(), this.internalKeyConverter, this.internalValueConverter);
            return new WorkerSourceTask(id, (SourceTask)task, statusListener, initialState, this.keyConverter, this.valueConverter, this.producer, offsetReader, offsetWriter, this.config, this.time);
        }
        if (task instanceof SinkTask) {
            return new WorkerSinkTask(id, (SinkTask)task, statusListener, initialState, this.config, this.keyConverter, this.valueConverter, this.time);
        }
        log.error("Tasks must be a subclass of either SourceTask or SinkTask", (Object)task);
        throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
    }

    private static Task instantiateTask(Class<? extends Task> taskClass) {
        try {
            return (Task)Utils.newInstance(taskClass);
        }
        catch (KafkaException e) {
            throw new ConnectException("Task class not found", (Throwable)e);
        }
    }

    public void stopTasks(Collection<ConnectorTaskId> ids) {
        for (ConnectorTaskId id : ids) {
            this.stopTask(this.getTask(id));
        }
    }

    public void awaitStopTasks(Collection<ConnectorTaskId> ids) {
        long now = this.time.milliseconds();
        long deadline = now + this.config.getLong("task.shutdown.graceful.timeout.ms");
        for (ConnectorTaskId id : ids) {
            long remaining = Math.max(0L, deadline - this.time.milliseconds());
            this.awaitStopTask(this.getTask(id), remaining);
        }
    }

    private void awaitStopTask(WorkerTask task, long timeout) {
        if (!task.awaitStop(timeout)) {
            log.error("Graceful stop of task {} failed.", (Object)task.id());
            task.cancel();
        }
        this.tasks.remove(task.id());
    }

    private void stopTask(WorkerTask task) {
        log.info("Stopping task {}", (Object)task.id());
        if (task instanceof WorkerSourceTask) {
            this.sourceTaskOffsetCommitter.remove(task.id());
        }
        task.stop();
    }

    public void stopAndAwaitTask(ConnectorTaskId id) {
        WorkerTask task = this.getTask(id);
        this.stopTask(task);
        this.awaitStopTask(task, this.config.getLong("task.shutdown.graceful.timeout.ms"));
    }

    public Set<ConnectorTaskId> taskIds() {
        return this.tasks.keySet();
    }

    private WorkerTask getTask(ConnectorTaskId id) {
        WorkerTask task = this.tasks.get(id);
        if (task == null) {
            log.error("Task not found: " + id);
            throw new ConnectException("Task not found: " + id);
        }
        return task;
    }

    public Converter getInternalKeyConverter() {
        return this.internalKeyConverter;
    }

    public Converter getInternalValueConverter() {
        return this.internalValueConverter;
    }

    public String workerId() {
        return this.workerId;
    }

    public boolean ownsConnector(String connName) {
        return this.connectors.containsKey(connName);
    }

    public void setTargetState(String connName, TargetState state) {
        log.info("Setting connector {} state to {}", (Object)connName, (Object)state);
        WorkerConnector connector = this.connectors.get(connName);
        if (connector != null) {
            connector.transitionTo(state);
        }
        for (Map.Entry<ConnectorTaskId, WorkerTask> taskEntry : this.tasks.entrySet()) {
            if (!taskEntry.getKey().connector().equals(connName)) continue;
            taskEntry.getValue().transitionTo(state);
        }
    }
}

