/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnectorAuditCode;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnectorErrorCode;
import org.odpi.openmetadata.frameworks.auditlog.AuditLog;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOpenMetadataEventProducer
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataEventProducer.class);
    private final List<String> sendBuffer = Collections.synchronizedList(new ArrayList());
    private final AuditLog auditLog;
    private final String topicName;
    private final String localServerId;
    private final Properties producerProperties;
    private volatile boolean running = true;
    private Producer<String, String> producer = null;
    private long messageSendCount = 0L;
    private long kafkaSendAttemptCount = 0L;
    private long messagePublishRequestCount = 0L;
    private long inmemoryPutMessageCount = 0L;
    private long kafkaSendFailCount = 0L;
    private long messageFailedSendCount = 0L;

    KafkaOpenMetadataEventProducer(String topicName, String localServerId, Properties producerProperties, AuditLog auditLog) {
        this.auditLog = auditLog;
        this.topicName = topicName;
        this.localServerId = localServerId;
        this.producerProperties = producerProperties;
        String actionDescription = "new producer";
        if (auditLog != null) {
            auditLog.logMessage("new producer", KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_PRODUCER_PROPERTIES.getMessageDefinition(Integer.toString(producerProperties.size()), topicName), producerProperties.toString());
        }
    }

    private void publishEvent(String event) throws ConnectorCheckedException {
        String methodName = "publishEvent";
        String messageFailedCountString = "Metrics: messageFailedSendCount {}";
        boolean eventSent = false;
        long eventRetryCount = 0L;
        ++this.messagePublishRequestCount;
        log.debug("Metrics: messagePublishRequestCount {}", (Object)this.messagePublishRequestCount);
        if (this.producer == null) {
            try {
                log.debug("Creating new producer for topic {}", (Object)this.topicName);
                this.producer = new KafkaProducer(this.producerProperties);
            }
            catch (Exception error) {
                if (this.auditLog != null) {
                    this.auditLog.logException("publishEvent", KafkaOpenMetadataTopicConnectorAuditCode.ERROR_CONNECTING_KAFKA_PRODUCER.getMessageDefinition(this.topicName), (Throwable)error);
                }
                throw new ConnectorCheckedException(KafkaOpenMetadataTopicConnectorErrorCode.ERROR_CONNECTING_KAFKA_PRODUCER.getMessageDefinition(error.getMessage()), this.getClass().getName(), "publishEvent", (Throwable)error);
            }
        }
        while (!eventSent) {
            try {
                log.debug("Sending message try {} [0 based] : {}", (Object)eventRetryCount, (Object)event);
                ProducerRecord producerRecord = new ProducerRecord(this.topicName, (Object)this.localServerId, (Object)event);
                ++this.kafkaSendAttemptCount;
                log.debug("Metrics: kafkaSendAttemptCount {}", (Object)this.kafkaSendAttemptCount);
                this.producer.send(producerRecord).get();
                eventSent = true;
                ++this.messageSendCount;
                log.debug("Metrics: messageSendCount {}", (Object)this.messageSendCount);
            }
            catch (ExecutionException error) {
                ++this.kafkaSendFailCount;
                log.debug("Metrics: kafkaSendFailCount {}", (Object)this.kafkaSendFailCount);
                log.debug("Kafka had trouble sending event: {} : Exception  message is {}", (Object)event, (Object)error.getMessage());
                if (!this.isExceptionRetryable(error)) {
                    log.debug("Exception not retryable, closing producer");
                    this.producer.close();
                    this.producer = null;
                    ++this.messageFailedSendCount;
                    log.warn("Metrics: messageFailedSendCount {}", (Object)this.messageFailedSendCount);
                    throw new ConnectorCheckedException(KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT.getMessageDefinition(error.getClass().getName(), this.topicName, error.getMessage()), this.getClass().getName(), "publishEvent", (Throwable)error);
                }
                if (eventRetryCount == 10L) {
                    this.producer.close();
                    this.producer = null;
                    ++this.messageFailedSendCount;
                    log.warn("Metrics: messageFailedSendCount {}", (Object)this.messageFailedSendCount);
                    log.error("Retryable Exception closed producer after {} tries", (Object)eventRetryCount);
                    break;
                }
                if (eventRetryCount == 0L) {
                    log.debug("Retrying event warning - count is {}", (Object)eventRetryCount);
                    if (this.auditLog != null) {
                        this.auditLog.logMessage("publishEvent", KafkaOpenMetadataTopicConnectorAuditCode.EVENT_SEND_IN_ERROR_LOOP.getMessageDefinition(this.topicName, Long.toString(this.messageSendCount), Long.toString(this.getSendBufferSize()), error.getMessage()));
                    }
                }
                ++eventRetryCount;
            }
            catch (WakeupException error) {
                log.warn("Wake up for shut down");
            }
            catch (Exception error) {
                if (this.producer != null) {
                    this.producer.close();
                    this.producer = null;
                }
                log.warn("Closed producer due to Exception in sendEvent {}", (Object)error.getMessage());
                if (error instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                ++this.messageFailedSendCount;
                log.warn("Metrics: messageFailedSendCount {}", (Object)this.messageFailedSendCount);
                throw new ConnectorCheckedException(KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT.getMessageDefinition(error.getClass().getName(), this.topicName, error.getMessage()), this.getClass().getName(), "publishEvent", (Throwable)error);
            }
        }
    }

    @Override
    public void run() {
        String listenerThreadName = this.topicName + "/" + Thread.currentThread().getName();
        String actionDescription = listenerThreadName + ":run";
        Thread.currentThread().setName(listenerThreadName);
        if (this.auditLog != null) {
            this.auditLog.logMessage(actionDescription, KafkaOpenMetadataTopicConnectorAuditCode.KAFKA_PRODUCER_START.getMessageDefinition(this.topicName, String.valueOf(this.sendBuffer.size())), this.producerProperties.toString());
        }
        log.info("Main loop started for topic {}", (Object)this.topicName);
        int sleepTime = 1000;
        while (this.isRunning()) {
            try {
                String bufferedEvent = this.getEvent();
                if (bufferedEvent == null) {
                    TimeUnit.MILLISECONDS.sleep(sleepTime);
                    continue;
                }
                log.debug("Processing buffered events");
                while (bufferedEvent != null) {
                    this.publishEvent(bufferedEvent);
                    bufferedEvent = this.getEvent();
                }
            }
            catch (InterruptedException error) {
                log.debug("Woken up from sleep ");
                Thread.currentThread().interrupt();
            }
            catch (Exception error) {
                log.warn("Bad exception from sending events: {}", (Object)error.getMessage());
                if (!this.isExceptionRetryable(error)) break;
                log.debug("Trying to recover");
                this.recoverAfterError();
            }
        }
        log.info("Exiting main loop for topic {} & cleaning up", (Object)this.topicName);
        if (this.producer != null) {
            log.debug("");
            this.producer.close();
            this.producer = null;
        }
        if (this.auditLog != null) {
            this.auditLog.logMessage(actionDescription, KafkaOpenMetadataTopicConnectorAuditCode.KAFKA_PRODUCER_SHUTDOWN.getMessageDefinition(this.topicName, Integer.toString(this.getSendBufferSize()), Long.toString(this.messageSendCount)), this.producerProperties.toString());
        }
    }

    private void putEvent(String newEvent) {
        ++this.inmemoryPutMessageCount;
        log.debug("Metrics: inmemoryPutMessageCount {}", (Object)this.inmemoryPutMessageCount);
        log.debug("Metrics: sendBufferSize {}", (Object)this.sendBuffer.size());
        this.sendBuffer.add(newEvent);
    }

    private int getSendBufferSize() {
        return this.sendBuffer.size();
    }

    private String getEvent() {
        if (this.sendBuffer.isEmpty()) {
            return null;
        }
        return this.sendBuffer.remove(0);
    }

    public void sendEvent(String event) {
        this.putEvent(event);
    }

    protected void recoverAfterError() {
        long recoverySleepTimeSec = 10L;
        log.info("Waiting {} seconds to recover", (Object)recoverySleepTimeSec);
        try {
            Thread.sleep(recoverySleepTimeSec * 1000L);
        }
        catch (InterruptedException e1) {
            log.debug("Interrupted while recovering with exception: {}", (Object)e1.getMessage());
            Thread.currentThread().interrupt();
        }
    }

    public void safeCloseProducer() {
        this.stopRunning();
    }

    private boolean isRunning() {
        return this.running;
    }

    private synchronized void stopRunning() {
        this.running = false;
    }

    private boolean isExceptionRetryable(Exception error) {
        Throwable nested = null;
        while ((nested = error.getCause()) != null) {
            if (nested instanceof RetriableException) {
                return true;
            }
            error = new Exception(error.getCause());
        }
        return false;
    }
}

