/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.runtime.SubmittedRecords;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.TransformationChain;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerSourceTaskContext;
import org.apache.kafka.connect.runtime.WorkerTask;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreation;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class WorkerSourceTask
extends WorkerTask {
    private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
    private static final long SEND_FAILED_BACKOFF_MS = 100L;
    private final WorkerConfig workerConfig;
    private final SourceTask task;
    private final ClusterConfigState configState;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final HeaderConverter headerConverter;
    private final TransformationChain<SourceRecord> transformationChain;
    private final KafkaProducer<byte[], byte[]> producer;
    private final TopicAdmin admin;
    private final CloseableOffsetStorageReader offsetReader;
    private final OffsetStorageWriter offsetWriter;
    private final Executor closeExecutor;
    private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
    private final AtomicReference<Exception> producerSendException;
    private final boolean isTopicTrackingEnabled;
    private final TopicCreation topicCreation;
    private List<SourceRecord> toSend;
    private volatile SubmittedRecords.CommittableOffsets committableOffsets;
    private final SubmittedRecords submittedRecords;
    private final CountDownLatch stopRequestedLatch;
    private Map<String, String> taskConfig;
    private boolean started = false;

    public WorkerSourceTask(ConnectorTaskId id, SourceTask task, TaskStatus.Listener statusListener, TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter, TransformationChain<SourceRecord> transformationChain, KafkaProducer<byte[], byte[]> producer, TopicAdmin admin, Map<String, TopicCreationGroup> topicGroups, CloseableOffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter, WorkerConfig workerConfig, ClusterConfigState configState, ConnectMetrics connectMetrics, ClassLoader loader, Time time, RetryWithToleranceOperator retryWithToleranceOperator, StatusBackingStore statusBackingStore, Executor closeExecutor) {
        super(id, statusListener, initialState, loader, connectMetrics, retryWithToleranceOperator, time, statusBackingStore);
        this.workerConfig = workerConfig;
        this.task = task;
        this.configState = configState;
        this.keyConverter = keyConverter;
        this.valueConverter = valueConverter;
        this.headerConverter = headerConverter;
        this.transformationChain = transformationChain;
        this.producer = producer;
        this.admin = admin;
        this.offsetReader = offsetReader;
        this.offsetWriter = offsetWriter;
        this.closeExecutor = closeExecutor;
        this.toSend = null;
        this.committableOffsets = SubmittedRecords.CommittableOffsets.EMPTY;
        this.submittedRecords = new SubmittedRecords();
        this.stopRequestedLatch = new CountDownLatch(1);
        this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
        this.producerSendException = new AtomicReference();
        this.isTopicTrackingEnabled = workerConfig.getBoolean("topic.tracking.enable");
        this.topicCreation = TopicCreation.newTopicCreation(workerConfig, topicGroups);
    }

    @Override
    public void initialize(TaskConfig taskConfig) {
        try {
            this.taskConfig = taskConfig.originalsStrings();
        }
        catch (Throwable t) {
            log.error("{} Task failed initialization and will not be started.", (Object)this, (Object)t);
            this.onFailure(t);
        }
    }

    @Override
    protected void close() {
        if (this.started) {
            try {
                this.task.stop();
            }
            catch (Throwable t) {
                log.warn("Could not stop task", t);
            }
        }
        this.closeProducer(Duration.ofSeconds(30L));
        if (this.admin != null) {
            try {
                this.admin.close(Duration.ofSeconds(30L));
            }
            catch (Throwable t) {
                log.warn("Failed to close admin client on time", t);
            }
        }
        Utils.closeQuietly(this.transformationChain, "transformation chain");
        Utils.closeQuietly(this.retryWithToleranceOperator, "retry operator");
    }

    @Override
    public void removeMetrics() {
        try {
            this.sourceTaskMetricsGroup.close();
        }
        finally {
            super.removeMetrics();
        }
    }

    @Override
    public void cancel() {
        super.cancel();
        this.offsetReader.close();
        this.closeExecutor.execute(() -> this.closeProducer(Duration.ZERO));
    }

    @Override
    public void stop() {
        super.stop();
        this.stopRequestedLatch.countDown();
    }

    @Override
    protected void initializeAndStart() {
        this.started = true;
        this.task.initialize(new WorkerSourceTaskContext(this.offsetReader, this, this.configState));
        this.task.start(this.taskConfig);
        log.info("{} Source task finished initialization and start", (Object)this);
    }

    @Override
    public void execute() {
        try {
            log.info("{} Executing source task", (Object)this);
            while (!this.isStopping()) {
                this.updateCommittableOffsets();
                if (this.shouldPause()) {
                    this.onPause();
                    if (!this.awaitUnpause()) continue;
                    this.onResume();
                    continue;
                }
                this.maybeThrowProducerSendException();
                if (this.toSend == null) {
                    log.trace("{} Nothing to send to Kafka. Polling source for additional records", (Object)this);
                    long start = this.time.milliseconds();
                    this.toSend = this.poll();
                    if (this.toSend != null) {
                        this.recordPollReturned(this.toSend.size(), this.time.milliseconds() - start);
                    }
                }
                if (this.toSend == null) continue;
                log.trace("{} About to send {} records to Kafka", (Object)this, (Object)this.toSend.size());
                if (this.sendRecords()) continue;
                this.stopRequestedLatch.await(100L, TimeUnit.MILLISECONDS);
            }
        }
        catch (InterruptedException interruptedException) {
        }
        finally {
            this.submittedRecords.awaitAllMessages(this.workerConfig.getLong("offset.flush.timeout.ms"), TimeUnit.MILLISECONDS);
            this.updateCommittableOffsets();
            this.commitOffsets();
        }
    }

    private void closeProducer(Duration duration) {
        if (this.producer != null) {
            try {
                this.producer.close(duration);
            }
            catch (Throwable t) {
                log.warn("Could not close producer for {}", (Object)this.id, (Object)t);
            }
        }
    }

    private void maybeThrowProducerSendException() {
        if (this.producerSendException.get() != null) {
            throw new ConnectException("Unrecoverable exception from producer send callback", this.producerSendException.get());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateCommittableOffsets() {
        SubmittedRecords.CommittableOffsets newOffsets = this.submittedRecords.committableOffsets();
        WorkerSourceTask workerSourceTask = this;
        synchronized (workerSourceTask) {
            this.committableOffsets = this.committableOffsets.updatedWith(newOffsets);
        }
    }

    protected List<SourceRecord> poll() throws InterruptedException {
        try {
            return this.task.poll();
        }
        catch (org.apache.kafka.common.errors.RetriableException | RetriableException e) {
            log.warn("{} failed to poll records from SourceTask. Will retry operation.", (Object)this, (Object)e);
            return null;
        }
    }

    private ProducerRecord<byte[], byte[]> convertTransformedRecord(SourceRecord record) {
        if (record == null) {
            return null;
        }
        RecordHeaders headers = (RecordHeaders)this.retryWithToleranceOperator.execute(() -> this.convertHeaderFor(record), Stage.HEADER_CONVERTER, this.headerConverter.getClass());
        byte[] key = (byte[])this.retryWithToleranceOperator.execute(() -> this.keyConverter.fromConnectData(record.topic(), headers, record.keySchema(), record.key()), Stage.KEY_CONVERTER, this.keyConverter.getClass());
        byte[] value = (byte[])this.retryWithToleranceOperator.execute(() -> this.valueConverter.fromConnectData(record.topic(), headers, record.valueSchema(), record.value()), Stage.VALUE_CONVERTER, this.valueConverter.getClass());
        if (this.retryWithToleranceOperator.failed()) {
            return null;
        }
        return new ProducerRecord<byte[], byte[]>(record.topic(), record.kafkaPartition(), ConnectUtils.checkAndConvertTimestamp(record.timestamp()), key, value, headers);
    }

    private boolean sendRecords() {
        int processed = 0;
        this.recordBatch(this.toSend.size());
        SourceRecordWriteCounter counter = this.toSend.size() > 0 ? new SourceRecordWriteCounter(this.toSend.size(), this.sourceTaskMetricsGroup) : null;
        for (SourceRecord preTransformRecord : this.toSend) {
            this.maybeThrowProducerSendException();
            this.retryWithToleranceOperator.sourceRecord(preTransformRecord);
            SourceRecord record = this.transformationChain.apply(preTransformRecord);
            ProducerRecord<byte[], byte[]> producerRecord = this.convertTransformedRecord(record);
            if (producerRecord == null || this.retryWithToleranceOperator.failed()) {
                counter.skipRecord();
                this.commitTaskRecord(preTransformRecord, null);
                continue;
            }
            log.trace("{} Appending record to the topic {} with key {}, value {}", new Object[]{this, record.topic(), record.key(), record.value()});
            SubmittedRecords.SubmittedRecord submittedRecord = this.submittedRecords.submit(record);
            try {
                this.maybeCreateTopic(record.topic());
                String topic = producerRecord.topic();
                this.producer.send(producerRecord, (recordMetadata, e) -> {
                    if (e != null) {
                        if (this.retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) {
                            log.trace("Ignoring failed record send: {} failed to send record to {}: ", new Object[]{this, topic, e});
                            this.retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class, preTransformRecord, (Throwable)e);
                            this.commitTaskRecord(preTransformRecord, null);
                        } else {
                            log.error("{} failed to send record to {}: ", new Object[]{this, topic, e});
                            log.trace("{} Failed record: {}", (Object)this, (Object)preTransformRecord);
                            this.producerSendException.compareAndSet(null, e);
                        }
                    } else {
                        submittedRecord.ack();
                        counter.completeRecord();
                        log.trace("{} Wrote record successfully: topic {} partition {} offset {}", new Object[]{this, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()});
                        this.commitTaskRecord(preTransformRecord, recordMetadata);
                        if (this.isTopicTrackingEnabled) {
                            this.recordActiveTopic(producerRecord.topic());
                        }
                    }
                });
            }
            catch (org.apache.kafka.common.errors.RetriableException | RetriableException e2) {
                log.warn("{} Failed to send record to topic '{}' and partition '{}'. Backing off before retrying: ", new Object[]{this, producerRecord.topic(), producerRecord.partition(), e2});
                this.toSend = this.toSend.subList(processed, this.toSend.size());
                this.submittedRecords.removeLastOccurrence(submittedRecord);
                counter.retryRemaining();
                return false;
            }
            catch (ConnectException e3) {
                log.warn("{} Failed to send record to topic '{}' and partition '{}' due to an unrecoverable exception: ", new Object[]{this, producerRecord.topic(), producerRecord.partition(), e3});
                log.trace("{} Failed to send {} with unrecoverable exception: ", new Object[]{this, producerRecord, e3});
                throw e3;
            }
            catch (KafkaException e4) {
                throw new ConnectException("Unrecoverable exception trying to send", e4);
            }
            ++processed;
        }
        this.toSend = null;
        return true;
    }

    private void maybeCreateTopic(String topic) {
        if (!this.topicCreation.isTopicCreationRequired(topic)) {
            log.trace("Topic creation by the connector is disabled or the topic {} was previously created.If auto.create.topics.enable is enabled on the broker, the topic will be created with default settings", (Object)topic);
            return;
        }
        log.info("The task will send records to topic '{}' for the first time. Checking whether topic exists", (Object)topic);
        Map<String, TopicDescription> existing = this.admin.describeTopics(topic);
        if (!existing.isEmpty()) {
            log.info("Topic '{}' already exists.", (Object)topic);
            this.topicCreation.addTopic(topic);
            return;
        }
        log.info("Creating topic '{}'", (Object)topic);
        TopicCreationGroup topicGroup = this.topicCreation.findFirstGroup(topic);
        log.debug("Topic '{}' matched topic creation group: {}", (Object)topic, (Object)topicGroup);
        NewTopic newTopic = topicGroup.newTopic(topic);
        TopicAdmin.TopicCreationResponse response = this.admin.createOrFindTopics(newTopic);
        if (response.isCreated(newTopic.name())) {
            this.topicCreation.addTopic(topic);
            log.info("Created topic '{}' using creation group {}", (Object)newTopic, (Object)topicGroup);
        } else if (response.isExisting(newTopic.name())) {
            this.topicCreation.addTopic(topic);
            log.info("Found existing topic '{}'", (Object)newTopic);
        } else {
            log.warn("Request to create new topic '{}' failed", (Object)topic);
            throw new ConnectException("Task failed to create new topic " + newTopic + ". Ensure that the task is authorized to create topics or that the topic exists and restart the task");
        }
    }

    private RecordHeaders convertHeaderFor(SourceRecord record) {
        Headers headers = record.headers();
        RecordHeaders result = new RecordHeaders();
        if (headers != null) {
            String topic = record.topic();
            for (Header header : headers) {
                String key = header.key();
                byte[] rawHeader = this.headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
                result.add(key, rawHeader);
            }
        }
        return result;
    }

    private void commitTaskRecord(SourceRecord record, RecordMetadata metadata) {
        try {
            this.task.commitRecord(record, metadata);
        }
        catch (Throwable t) {
            log.error("{} Exception thrown while calling task.commitRecord()", (Object)this, (Object)t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean commitOffsets() {
        SubmittedRecords.CommittableOffsets offsetsToCommit;
        long commitTimeoutMs = this.workerConfig.getLong("offset.flush.timeout.ms");
        log.debug("{} Committing offsets", (Object)this);
        long started = this.time.milliseconds();
        long timeout = started + commitTimeoutMs;
        WorkerSourceTask workerSourceTask = this;
        synchronized (workerSourceTask) {
            offsetsToCommit = this.committableOffsets;
            this.committableOffsets = SubmittedRecords.CommittableOffsets.EMPTY;
        }
        if (this.committableOffsets.isEmpty()) {
            log.debug("{} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors.", (Object)this);
        } else {
            log.info("{} Committing offsets for {} acknowledged messages", (Object)this, (Object)this.committableOffsets.numCommittableMessages());
            if (this.committableOffsets.hasPending()) {
                log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. The source partition with the most pending messages is {}, with {} pending messages", new Object[]{this, this.committableOffsets.numUncommittableMessages(), this.committableOffsets.numDeques(), this.committableOffsets.largestDequePartition(), this.committableOffsets.largestDequeSize()});
            } else {
                log.debug("{} There are currently no pending messages for this offset commit; all messages dispatched to the task's producer since the last commit have been acknowledged", (Object)this);
            }
        }
        offsetsToCommit.offsets().forEach(this.offsetWriter::offset);
        if (!this.offsetWriter.beginFlush()) {
            long durationMillis = this.time.milliseconds() - started;
            this.recordCommitSuccess(durationMillis);
            log.debug("{} Finished offset commitOffsets successfully in {} ms", (Object)this, (Object)durationMillis);
            this.commitSourceTask();
            return true;
        }
        Future<Void> flushFuture = this.offsetWriter.doFlush((error, result) -> {
            if (error != null) {
                log.error("{} Failed to flush offsets to storage: ", (Object)this, (Object)error);
            } else {
                log.trace("{} Finished flushing offsets to storage", (Object)this);
            }
        });
        if (flushFuture == null) {
            this.offsetWriter.cancelFlush();
            this.recordCommitFailure(this.time.milliseconds() - started, null);
            return false;
        }
        try {
            flushFuture.get(Math.max(timeout - this.time.milliseconds(), 0L), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            log.warn("{} Flush of offsets interrupted, cancelling", (Object)this);
            this.offsetWriter.cancelFlush();
            this.recordCommitFailure(this.time.milliseconds() - started, e);
            return false;
        }
        catch (ExecutionException e) {
            log.error("{} Flush of offsets threw an unexpected exception: ", (Object)this, (Object)e);
            this.offsetWriter.cancelFlush();
            this.recordCommitFailure(this.time.milliseconds() - started, e);
            return false;
        }
        catch (TimeoutException e) {
            log.error("{} Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets", (Object)this);
            this.offsetWriter.cancelFlush();
            this.recordCommitFailure(this.time.milliseconds() - started, null);
            return false;
        }
        long durationMillis = this.time.milliseconds() - started;
        this.recordCommitSuccess(durationMillis);
        log.debug("{} Finished commitOffsets successfully in {} ms", (Object)this, (Object)durationMillis);
        this.commitSourceTask();
        return true;
    }

    private void commitSourceTask() {
        try {
            this.task.commit();
        }
        catch (Throwable t) {
            log.error("{} Exception thrown while calling task.commit()", (Object)this, (Object)t);
        }
    }

    public String toString() {
        return "WorkerSourceTask{id=" + this.id + '}';
    }

    protected void recordPollReturned(int numRecordsInBatch, long duration) {
        this.sourceTaskMetricsGroup.recordPoll(numRecordsInBatch, duration);
    }

    SourceTaskMetricsGroup sourceTaskMetricsGroup() {
        return this.sourceTaskMetricsGroup;
    }

    static class SourceTaskMetricsGroup {
        private final ConnectMetrics.MetricGroup metricGroup;
        private final Sensor sourceRecordPoll;
        private final Sensor sourceRecordWrite;
        private final Sensor sourceRecordActiveCount;
        private final Sensor pollTime;
        private int activeRecordCount;

        public SourceTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) {
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.sourceTaskGroupName(), registry.connectorTagName(), id.connector(), registry.taskTagName(), Integer.toString(id.task()));
            this.metricGroup.close();
            this.sourceRecordPoll = this.metricGroup.sensor("source-record-poll");
            this.sourceRecordPoll.add(this.metricGroup.metricName(registry.sourceRecordPollRate), new Rate());
            this.sourceRecordPoll.add(this.metricGroup.metricName(registry.sourceRecordPollTotal), new CumulativeSum());
            this.sourceRecordWrite = this.metricGroup.sensor("source-record-write");
            this.sourceRecordWrite.add(this.metricGroup.metricName(registry.sourceRecordWriteRate), new Rate());
            this.sourceRecordWrite.add(this.metricGroup.metricName(registry.sourceRecordWriteTotal), new CumulativeSum());
            this.pollTime = this.metricGroup.sensor("poll-batch-time");
            this.pollTime.add(this.metricGroup.metricName(registry.sourceRecordPollBatchTimeMax), new Max());
            this.pollTime.add(this.metricGroup.metricName(registry.sourceRecordPollBatchTimeAvg), new Avg());
            this.sourceRecordActiveCount = this.metricGroup.sensor("source-record-active-count");
            this.sourceRecordActiveCount.add(this.metricGroup.metricName(registry.sourceRecordActiveCount), new Value());
            this.sourceRecordActiveCount.add(this.metricGroup.metricName(registry.sourceRecordActiveCountMax), new Max());
            this.sourceRecordActiveCount.add(this.metricGroup.metricName(registry.sourceRecordActiveCountAvg), new Avg());
        }

        void close() {
            this.metricGroup.close();
        }

        void recordPoll(int batchSize, long duration) {
            this.sourceRecordPoll.record(batchSize);
            this.pollTime.record(duration);
            this.activeRecordCount += batchSize;
            this.sourceRecordActiveCount.record(this.activeRecordCount);
        }

        void recordWrite(int recordCount) {
            this.sourceRecordWrite.record(recordCount);
            this.activeRecordCount -= recordCount;
            this.activeRecordCount = Math.max(0, this.activeRecordCount);
            this.sourceRecordActiveCount.record(this.activeRecordCount);
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    static class SourceRecordWriteCounter {
        private final SourceTaskMetricsGroup metricsGroup;
        private final int batchSize;
        private boolean completed = false;
        private int counter;

        public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup metricsGroup) {
            assert (batchSize > 0);
            assert (metricsGroup != null);
            this.batchSize = batchSize;
            this.counter = batchSize;
            this.metricsGroup = metricsGroup;
        }

        public void skipRecord() {
            if (this.counter > 0 && --this.counter == 0) {
                this.finishedAllWrites();
            }
        }

        public void completeRecord() {
            if (this.counter > 0 && --this.counter == 0) {
                this.finishedAllWrites();
            }
        }

        public void retryRemaining() {
            this.finishedAllWrites();
        }

        private void finishedAllWrites() {
            if (!this.completed) {
                this.metricsGroup.recordWrite(this.batchSize - this.counter);
                this.completed = true;
            }
        }
    }
}

