/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.kafka.connect.impl;

import com.hazelcast.client.impl.protocol.util.PropertiesUtil;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.ReflectionUtils;
import com.hazelcast.jet.kafka.connect.impl.State;
import com.hazelcast.jet.kafka.connect.impl.TaskRunner;
import com.hazelcast.jet.kafka.connect.impl.message.TaskConfigMessage;
import com.hazelcast.jet.kafka.connect.impl.message.TaskConfigPublisher;
import com.hazelcast.jet.retry.IntervalFunction;
import com.hazelcast.jet.retry.RetryStrategies;
import com.hazelcast.jet.retry.RetryStrategy;
import com.hazelcast.jet.retry.impl.RetryTracker;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.topic.Message;
import com.hazelcast.topic.MessageListener;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

public class SourceConnectorWrapper {
    public static final RetryStrategy DEFAULT_RECONNECT_BEHAVIOR = RetryStrategies.custom().maxAttempts(10).intervalFunction(IntervalFunction.exponentialBackoffWithCap((long)200L, (double)1.5, (long)5000L)).build();
    private final ILogger logger = Logger.getLogger(SourceConnectorWrapper.class);
    private SourceConnector sourceConnector;
    private int tasksMax;
    private TaskRunner taskRunner;
    private final ReentrantLock reconfigurationLock = new ReentrantLock();
    private final State state = new State();
    private String name;
    private final boolean isMasterProcessor;
    private final int processorOrder;
    private TaskConfigPublisher taskConfigPublisher;
    private final AtomicBoolean receivedTaskConfiguration = new AtomicBoolean();
    private final RetryTracker reconnectTracker;
    private Map<String, String> currentConfig;
    private Consumer<Boolean> activeStatusSetter = ignored -> {};
    private transient Exception lastConnectionException;

    public SourceConnectorWrapper(Properties currentConfig, int processorOrder, Processor.Context context) {
        this(currentConfig, processorOrder, context, DEFAULT_RECONNECT_BEHAVIOR);
    }

    public SourceConnectorWrapper(Properties currentConfig, int processorOrder, Processor.Context context, RetryStrategy retryStrategy) {
        this.validatePropertiesFromUser(currentConfig);
        this.processorOrder = processorOrder;
        this.isMasterProcessor = processorOrder == 0;
        RetryStrategy rs = retryStrategy == null ? DEFAULT_RECONNECT_BEHAVIOR : retryStrategy;
        this.reconnectTracker = new RetryTracker(rs);
        this.currentConfig = PropertiesUtil.toMap((Properties)currentConfig);
        this.createTopic(context.hazelcastInstance(), context.executionId());
        this.createSourceConnector();
    }

    void validatePropertiesFromUser(Properties propertiesFromUser) {
        Preconditions.checkRequiredProperty((Properties)propertiesFromUser, (String)"connector.class");
        this.name = Preconditions.checkRequiredProperty((Properties)propertiesFromUser, (String)"name");
        String propertyValue = Preconditions.checkRequiredProperty((Properties)propertiesFromUser, (String)"tasks.max");
        this.tasksMax = Integer.parseInt(propertyValue);
    }

    void createSourceConnector() {
        String connectorClazz = this.currentConfig.get("connector.class");
        if (!this.reconnectTracker.shouldTryAgain() && this.lastConnectionException != null) {
            throw new HazelcastException("Cannot connect using connector " + connectorClazz, (Throwable)this.lastConnectionException);
        }
        this.logger.info("Initializing connector '" + this.name + "' of class '" + connectorClazz + "'");
        try {
            this.sourceConnector = SourceConnectorWrapper.newConnectorInstance(connectorClazz);
            if (this.isMasterProcessor) {
                this.sourceConnector.initialize((ConnectorContext)new JetConnectorContext());
                this.logger.info("Starting connector '" + this.name + "'. Below are the propertiesFromUser");
                this.sourceConnector.start(this.currentConfig);
                this.logger.info("Connector '" + this.name + "' started");
            } else {
                this.logger.info("Connector '" + this.name + "' created, not starting because it's not a master processor");
            }
        }
        catch (Exception e) {
            this.logger.warning("Error while starting connector", (Throwable)e);
            this.reconnectTracker.attemptFailed();
            if (this.sourceConnector != null) {
                this.sourceConnector.stop();
                this.sourceConnector = null;
            }
            this.lastConnectionException = e;
            return;
        }
        try {
            this.logger.info("Creating task runner '" + this.name + "'");
            this.createTaskRunner();
            this.logger.info("Task runner '" + this.name + "' created");
        }
        catch (Exception e) {
            this.reconnectTracker.attemptFailed();
            this.lastConnectionException = e;
        }
    }

    boolean waitNeeded() {
        if (!this.reconnectTracker.shouldTryAgain()) {
            throw new HazelcastException("Cannot launch connector and/or task correctly", (Throwable)this.lastConnectionException);
        }
        if (this.reconnectTracker.needsToWait()) {
            return true;
        }
        if (this.sourceConnector == null) {
            this.createSourceConnector();
        }
        return !this.restartTaskIfNeeded();
    }

    private boolean restartTaskIfNeeded() {
        if (this.sourceConnector == null) {
            return false;
        }
        try {
            this.taskRunner.restartTaskIfNeeded();
            return true;
        }
        catch (Exception e) {
            this.logger.warning("Error while restarting task", (Throwable)e);
            this.taskRunner.forceRestart();
            this.reconnectTracker.attemptFailed();
            this.lastConnectionException = e;
            return false;
        }
    }

    TaskRunner createTaskRunner() {
        String taskName = this.name + "-task-" + this.processorOrder;
        this.taskRunner = new TaskRunner(taskName, this.state, this::createSourceTask);
        this.requestTaskReconfiguration();
        return this.taskRunner;
    }

    private SourceTask createSourceTask() {
        Class<SourceTask> taskClass = this.sourceConnector.taskClass().asSubclass(SourceTask.class);
        return (SourceTask)ReflectionUtils.newInstance((ClassLoader)Thread.currentThread().getContextClassLoader(), (String)taskClass.getName());
    }

    void setActiveStatusSetter(Consumer<Boolean> activeStatusSetter) {
        this.activeStatusSetter = activeStatusSetter;
    }

    public boolean hasTaskConfiguration() {
        return this.receivedTaskConfiguration.get();
    }

    void createTopic(HazelcastInstance hazelcastInstance, long executionId) {
        if (hazelcastInstance != null) {
            this.taskConfigPublisher = new TaskConfigPublisher(hazelcastInstance);
            this.taskConfigPublisher.createTopic(executionId);
            this.taskConfigPublisher.addMessageListener((MessageListener<TaskConfigMessage>)((MessageListener)this::processMessage));
        }
    }

    void destroyTopic() {
        this.taskConfigPublisher.removeMessageListeners();
        if (this.isMasterProcessor) {
            this.taskConfigPublisher.destroyTopic();
        }
    }

    protected void publishMessage(TaskConfigMessage taskConfigMessage) {
        if (this.taskConfigPublisher != null) {
            this.logger.info("Publishing TaskConfigTopic");
            this.taskConfigPublisher.publish(taskConfigMessage);
        }
    }

    private void processMessage(Message<TaskConfigMessage> message) {
        this.logger.info("Received TaskConfigTopic message");
        TaskConfigMessage taskConfigMessage = (TaskConfigMessage)message.getMessageObject();
        this.processMessage(taskConfigMessage);
    }

    protected void processMessage(TaskConfigMessage taskConfigMessage) {
        this.state.setTaskConfigs(taskConfigMessage.getTaskConfigs());
        Map<String, String> taskConfig = this.state.getTaskConfig(this.processorOrder);
        boolean active = taskConfig != null;
        this.activeStatusSetter.accept(active);
        if (taskConfig != null) {
            this.logger.info("Updating taskRunner with processorOrder = " + this.processorOrder + " with taskConfig=" + this.maskPasswords(taskConfig));
            this.taskRunner.updateTaskConfig(taskConfig);
            this.currentConfig = taskConfig;
        }
        this.receivedTaskConfiguration.set(true);
    }

    private Map<String, String> maskPasswords(Map<String, String> configMap) {
        LinkedHashMap<String, String> newMap = new LinkedHashMap<String, String>(configMap);
        newMap.replaceAll((k, v) -> {
            if (k.toLowerCase(Locale.ROOT).contains("password")) {
                return "****";
            }
            return v;
        });
        return newMap;
    }

    public List<SourceRecord> poll() {
        try {
            return this.taskRunner.poll();
        }
        catch (Exception e) {
            this.reconnectTracker.attemptFailed();
            this.lastConnectionException = e;
            String willRetry = this.reconnectTracker.shouldTryAgain() ? ", will reconnect later" : "";
            this.logger.warning("Exception while polling records" + willRetry, (Throwable)e);
            this.taskRunner.forceRestart();
            return Collections.emptyList();
        }
    }

    public void commitRecord(SourceRecord sourceRecord) {
        try {
            this.taskRunner.commitRecord(sourceRecord);
        }
        catch (Exception e) {
            this.taskRunner.forceRestart();
            this.reconnectTracker.attemptFailed();
            this.lastConnectionException = e;
            String willRetry = this.reconnectTracker.shouldTryAgain() ? ", will reconnect later" : "";
            this.logger.warning("Exception while committing records" + willRetry, (Throwable)e);
        }
    }

    public State copyState() {
        return this.taskRunner.copyState();
    }

    public void restoreState(State state) {
        this.taskRunner.restoreState(state);
    }

    public void commit() {
        try {
            this.taskRunner.commit();
        }
        catch (Exception e) {
            this.taskRunner.forceRestart();
            this.reconnectTracker.attemptFailed();
            this.lastConnectionException = e;
            String willRetry = this.reconnectTracker.shouldTryAgain() ? ", will reconnect later" : "";
            this.logger.warning("Exception while committing records" + willRetry, (Throwable)e);
        }
    }

    public String getTaskRunnerName() {
        return this.taskRunner.getName();
    }

    private static SourceConnector newConnectorInstance(String connectorClazz) {
        try {
            return (SourceConnector)ReflectionUtils.newInstance((ClassLoader)Thread.currentThread().getContextClassLoader(), (String)connectorClazz);
        }
        catch (Exception e) {
            if (e instanceof ClassNotFoundException) {
                throw new HazelcastException("Connector class '" + connectorClazz + "' not found. Did you add the connector jar to the job?", (Throwable)e);
            }
            throw ExceptionUtil.rethrow((Throwable)e);
        }
    }

    public void close() {
        this.logger.info("Stopping connector '" + this.name + "'");
        this.taskRunner.stop();
        this.sourceConnector.stop();
        this.destroyTopic();
        this.logger.info("Connector '" + this.name + "' stopped");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void requestTaskReconfiguration() {
        if (!this.isMasterProcessor) {
            this.logger.fine("requestTaskReconfiguration is skipped because Source Connector is not master");
            return;
        }
        try {
            this.reconfigurationLock.lock();
            this.logger.info("Updating tasks configuration");
            List taskConfigs = this.sourceConnector.taskConfigs(this.tasksMax);
            for (int index = 0; index < taskConfigs.size(); ++index) {
                Map map = (Map)taskConfigs.get(index);
                this.logger.fine("sourceConnector index " + index + " taskConfig=" + map);
            }
            TaskConfigMessage taskConfigMessage = new TaskConfigMessage();
            taskConfigMessage.setTaskConfigs(taskConfigs);
            this.publishMessage(taskConfigMessage);
            this.logger.info(taskConfigs.size() + " task configs were sent");
        }
        finally {
            this.reconfigurationLock.unlock();
        }
    }

    public String toString() {
        return "ConnectorWrapper{name='" + this.name + "', tasksMax=" + this.tasksMax + ", isMasterProcessor=" + this.isMasterProcessor + ", processorOrder=" + this.processorOrder + ", receivedTaskConfiguration=" + this.receivedTaskConfiguration + "}";
    }

    public boolean hasTaskRunner() {
        return this.taskRunner != null;
    }

    private class JetConnectorContext
    implements ConnectorContext {
        private JetConnectorContext() {
        }

        public void requestTaskReconfiguration() {
            SourceConnectorWrapper.this.requestTaskReconfiguration();
        }

        public void raiseError(Exception e) {
            throw ExceptionUtil.rethrow((Throwable)e);
        }
    }
}

