/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.TransformationChain;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerSinkTaskContext;
import org.apache.kafka.connect.runtime.WorkerTask;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.SinkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class WorkerSinkTask
extends WorkerTask {
    private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);
    private final WorkerConfig workerConfig;
    private final SinkTask task;
    private Map<String, String> taskConfig;
    private final Time time;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final TransformationChain<SinkRecord> transformationChain;
    private KafkaConsumer<byte[], byte[]> consumer;
    private WorkerSinkTaskContext context;
    private final List<SinkRecord> messageBatch;
    private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets;
    private RuntimeException rebalanceException;
    private long nextCommit;
    private int commitSeqno;
    private long commitStarted;
    private int commitFailures;
    private boolean pausedForRedelivery;
    private boolean committing;

    public WorkerSinkTask(ConnectorTaskId id, SinkTask task, TaskStatus.Listener statusListener, TargetState initialState, WorkerConfig workerConfig, Converter keyConverter, Converter valueConverter, TransformationChain<SinkRecord> transformationChain, Time time) {
        super(id, statusListener, initialState);
        this.workerConfig = workerConfig;
        this.task = task;
        this.keyConverter = keyConverter;
        this.valueConverter = valueConverter;
        this.transformationChain = transformationChain;
        this.time = time;
        this.messageBatch = new ArrayList<SinkRecord>();
        this.currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        this.pausedForRedelivery = false;
        this.rebalanceException = null;
        this.nextCommit = time.milliseconds() + workerConfig.getLong("offset.flush.interval.ms");
        this.committing = false;
        this.commitSeqno = 0;
        this.commitStarted = -1L;
        this.commitFailures = 0;
    }

    @Override
    public void initialize(TaskConfig taskConfig) {
        try {
            this.taskConfig = taskConfig.originalsStrings();
            this.consumer = this.createConsumer();
            this.context = new WorkerSinkTaskContext(this.consumer);
        }
        catch (Throwable t) {
            log.error("Task {} failed initialization and will not be started.", t);
            this.onFailure(t);
        }
    }

    @Override
    public void stop() {
        super.stop();
        this.consumer.wakeup();
    }

    @Override
    protected void close() {
        this.task.stop();
        if (this.consumer != null) {
            this.consumer.close();
        }
        this.transformationChain.close();
    }

    @Override
    public void transitionTo(TargetState state) {
        super.transitionTo(state);
        this.consumer.wakeup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void execute() {
        this.initializeAndStart();
        try {
            while (!this.isStopping()) {
                this.iteration();
            }
        }
        finally {
            this.closePartitions();
        }
    }

    protected void iteration() {
        block6: {
            long offsetCommitIntervalMs = this.workerConfig.getLong("offset.flush.interval.ms");
            long commitTimeoutMs = this.commitStarted + this.workerConfig.getLong("offset.flush.timeout.ms");
            try {
                long now = this.time.milliseconds();
                if (!this.committing && (this.context.isCommitRequested() || now >= this.nextCommit)) {
                    this.commitOffsets(now, false);
                    this.nextCommit += offsetCommitIntervalMs;
                    this.context.clearCommitRequest();
                }
                if (this.committing && now >= commitTimeoutMs) {
                    log.warn("Commit of {} offsets timed out", (Object)this);
                    ++this.commitFailures;
                    this.committing = false;
                }
                long timeoutMs = Math.max(this.nextCommit - now, 0L);
                this.poll(timeoutMs);
            }
            catch (WakeupException we) {
                log.trace("{} consumer woken up", (Object)this.id);
                if (this.isStopping()) {
                    return;
                }
                if (this.shouldPause()) {
                    this.pauseAll();
                    this.onPause();
                    this.context.requestCommit();
                }
                if (this.pausedForRedelivery) break block6;
                this.resumeAll();
                this.onResume();
            }
        }
    }

    private void onCommitCompleted(Throwable error, long seqno) {
        if ((long)this.commitSeqno != seqno) {
            log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}", new Object[]{this, seqno, this.commitSeqno});
        } else {
            if (error != null) {
                log.error("Commit of {} offsets threw an unexpected exception: ", (Object)this, (Object)error);
                ++this.commitFailures;
            } else {
                log.debug("Finished {} offset commit successfully in {} ms", (Object)this, (Object)(this.time.milliseconds() - this.commitStarted));
                this.commitFailures = 0;
            }
            this.committing = false;
        }
    }

    public int commitFailures() {
        return this.commitFailures;
    }

    protected void initializeAndStart() {
        log.debug("Initializing task {} ", (Object)this.id);
        String topicsStr = this.taskConfig.get("topics");
        if (topicsStr == null || topicsStr.isEmpty()) {
            throw new ConnectException("Sink tasks require a list of topics.");
        }
        String[] topics = topicsStr.split(",");
        log.debug("Task {} subscribing to topics {}", (Object)this.id, (Object)topics);
        this.consumer.subscribe(Arrays.asList(topics), (ConsumerRebalanceListener)new HandleRebalance());
        this.task.initialize((SinkTaskContext)this.context);
        this.task.start(this.taskConfig);
        log.info("Sink task {} finished initialization and start", (Object)this);
    }

    protected void poll(long timeoutMs) {
        this.rewind();
        long retryTimeout = this.context.timeout();
        if (retryTimeout > 0L) {
            timeoutMs = Math.min(timeoutMs, retryTimeout);
            this.context.timeout(-1L);
        }
        log.trace("{} polling consumer with timeout {} ms", (Object)this.id, (Object)timeoutMs);
        ConsumerRecords<byte[], byte[]> msgs = this.pollConsumer(timeoutMs);
        assert (this.messageBatch.isEmpty() || msgs.isEmpty());
        log.trace("{} polling returned {} messages", (Object)this.id, (Object)msgs.count());
        this.convertMessages(msgs);
        this.deliverMessages();
    }

    private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> offsets, int seqno) {
        try {
            this.consumer.commitSync(offsets);
            this.lastCommittedOffsets = offsets;
            this.onCommitCompleted(null, seqno);
        }
        catch (WakeupException e) {
            this.doCommitSync(offsets, seqno);
            throw e;
        }
        catch (KafkaException e) {
            this.onCommitCompleted(e, seqno);
        }
    }

    private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean closing, final int seqno) {
        log.info("{} Committing offsets", (Object)this);
        if (closing) {
            this.doCommitSync(offsets, seqno);
        } else {
            OffsetCommitCallback cb = new OffsetCommitCallback(){

                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) {
                    if (error == null) {
                        WorkerSinkTask.this.lastCommittedOffsets = offsets;
                    }
                    WorkerSinkTask.this.onCommitCompleted(error, seqno);
                }
            };
            this.consumer.commitAsync(offsets, cb);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void commitOffsets(long now, boolean closing) {
        Map taskProvidedOffsets;
        if (this.currentOffsets.isEmpty()) {
            return;
        }
        this.committing = true;
        ++this.commitSeqno;
        this.commitStarted = now;
        try {
            taskProvidedOffsets = this.task.preCommit(new HashMap<TopicPartition, OffsetAndMetadata>(this.currentOffsets));
        }
        catch (Throwable t) {
            if (closing) {
                log.warn("{} Offset commit failed during close");
                this.onCommitCompleted(t, this.commitSeqno);
            } else {
                log.error("{} Offset commit failed, rewinding to last committed offsets", (Object)this, (Object)t);
                for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.lastCommittedOffsets.entrySet()) {
                    log.debug("{} Rewinding topic partition {} to offset {}", new Object[]{this.id, entry.getKey(), entry.getValue().offset()});
                    this.consumer.seek(entry.getKey(), entry.getValue().offset());
                }
                this.currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>(this.lastCommittedOffsets);
                this.onCommitCompleted(t, this.commitSeqno);
            }
            return;
        }
        finally {
            if (closing) {
                this.task.close(this.currentOffsets.keySet());
            }
        }
        if (taskProvidedOffsets.isEmpty()) {
            log.debug("{} Skipping offset commit, task opted-out", (Object)this);
            this.onCommitCompleted(null, this.commitSeqno);
            return;
        }
        HashMap<TopicPartition, OffsetAndMetadata> commitableOffsets = new HashMap<TopicPartition, OffsetAndMetadata>(this.lastCommittedOffsets);
        for (Map.Entry taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) {
            TopicPartition partition = (TopicPartition)taskProvidedOffsetEntry.getKey();
            OffsetAndMetadata taskProvidedOffset = (OffsetAndMetadata)taskProvidedOffsetEntry.getValue();
            if (commitableOffsets.containsKey(partition)) {
                if (taskProvidedOffset.offset() <= this.currentOffsets.get(partition).offset()) {
                    commitableOffsets.put(partition, taskProvidedOffset);
                    continue;
                }
                log.warn("Ignoring invalid task provided offset {}/{} -- not yet consumed", (Object)partition, (Object)taskProvidedOffset);
                continue;
            }
            log.warn("Ignoring invalid task provided offset {}/{} -- partition not assigned", (Object)partition, (Object)taskProvidedOffset);
        }
        if (commitableOffsets.equals(this.lastCommittedOffsets)) {
            log.debug("{} Skipping offset commit, no change since last commit", (Object)this);
            this.onCommitCompleted(null, this.commitSeqno);
            return;
        }
        log.trace("{} Offsets to commit: {}", (Object)this, commitableOffsets);
        this.doCommit(commitableOffsets, closing, this.commitSeqno);
    }

    public String toString() {
        return "WorkerSinkTask{id=" + this.id + '}';
    }

    private ConsumerRecords<byte[], byte[]> pollConsumer(long timeoutMs) {
        ConsumerRecords msgs = this.consumer.poll(timeoutMs);
        if (this.rebalanceException != null) {
            RuntimeException e = this.rebalanceException;
            this.rebalanceException = null;
            throw e;
        }
        return msgs;
    }

    private KafkaConsumer<byte[], byte[]> createConsumer() {
        KafkaConsumer newConsumer;
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("group.id", SinkUtils.consumerGroupId(this.id.connector()));
        props.put("bootstrap.servers", Utils.join((Collection)this.workerConfig.getList("bootstrap.servers"), (String)","));
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.putAll(this.workerConfig.originalsWithPrefix("consumer."));
        try {
            newConsumer = new KafkaConsumer(props);
        }
        catch (Throwable t) {
            throw new ConnectException("Failed to create consumer", t);
        }
        return newConsumer;
    }

    private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
        for (ConsumerRecord msg : msgs) {
            log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
            SchemaAndValue keyAndSchema = this.keyConverter.toConnectData(msg.topic(), (byte[])msg.key());
            SchemaAndValue valueAndSchema = this.valueConverter.toConnectData(msg.topic(), (byte[])msg.value());
            SinkRecord record = new SinkRecord(msg.topic(), msg.partition(), keyAndSchema.schema(), keyAndSchema.value(), valueAndSchema.schema(), valueAndSchema.value(), msg.offset(), msg.timestampType() == TimestampType.NO_TIMESTAMP_TYPE ? null : Long.valueOf(msg.timestamp()), msg.timestampType());
            if ((record = this.transformationChain.apply(record)) == null) continue;
            this.messageBatch.add(record);
        }
    }

    private void resumeAll() {
        for (TopicPartition tp : this.consumer.assignment()) {
            if (this.context.pausedPartitions().contains(tp)) continue;
            this.consumer.resume(Collections.singleton(tp));
        }
    }

    private void pauseAll() {
        this.consumer.pause((Collection)this.consumer.assignment());
    }

    private void deliverMessages() {
        try {
            this.task.put(new ArrayList<SinkRecord>(this.messageBatch));
            for (SinkRecord record : this.messageBatch) {
                this.currentOffsets.put(new TopicPartition(record.topic(), record.kafkaPartition().intValue()), new OffsetAndMetadata(record.kafkaOffset() + 1L));
            }
            this.messageBatch.clear();
            if (this.pausedForRedelivery) {
                if (!this.shouldPause()) {
                    this.resumeAll();
                }
                this.pausedForRedelivery = false;
            }
        }
        catch (RetriableException e) {
            log.error("RetriableException from SinkTask {}:", (Object)this.id, (Object)e);
            this.pausedForRedelivery = true;
            this.pauseAll();
        }
        catch (Throwable t) {
            log.error("Task {} threw an uncaught and unrecoverable exception", (Object)this.id, (Object)t);
            log.error("Task is being killed and will not recover until manually restarted");
            throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception.");
        }
    }

    private void rewind() {
        Map<TopicPartition, Long> offsets = this.context.offsets();
        if (offsets.isEmpty()) {
            return;
        }
        for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
            TopicPartition tp = entry.getKey();
            Long offset = entry.getValue();
            if (offset != null) {
                log.trace("Rewind {} to offset {}.", (Object)tp, (Object)offset);
                this.consumer.seek(tp, offset.longValue());
                this.lastCommittedOffsets.put(tp, new OffsetAndMetadata(offset.longValue()));
                this.currentOffsets.put(tp, new OffsetAndMetadata(offset.longValue()));
                continue;
            }
            log.warn("Cannot rewind {} to null offset.", (Object)tp);
        }
        this.context.clearOffsets();
    }

    private void openPartitions(Collection<TopicPartition> partitions) {
        this.task.open(partitions);
    }

    private void closePartitions() {
        this.commitOffsets(this.time.milliseconds(), true);
    }

    private class HandleRebalance
    implements ConsumerRebalanceListener {
        private HandleRebalance() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            WorkerSinkTask.this.lastCommittedOffsets = new HashMap();
            WorkerSinkTask.this.currentOffsets = new HashMap();
            for (TopicPartition tp : partitions) {
                long pos = WorkerSinkTask.this.consumer.position(tp);
                WorkerSinkTask.this.lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
                WorkerSinkTask.this.currentOffsets.put(tp, new OffsetAndMetadata(pos));
                log.debug("{} assigned topic partition {} with offset {}", new Object[]{WorkerSinkTask.this.id, tp, pos});
            }
            WorkerSinkTask.this.pausedForRedelivery = false;
            WorkerSinkTask.this.context.pausedPartitions().retainAll(partitions);
            if (WorkerSinkTask.this.shouldPause()) {
                WorkerSinkTask.this.pauseAll();
            } else if (!WorkerSinkTask.this.context.pausedPartitions().isEmpty()) {
                WorkerSinkTask.this.consumer.pause(WorkerSinkTask.this.context.pausedPartitions());
            }
            if (WorkerSinkTask.this.rebalanceException == null || WorkerSinkTask.this.rebalanceException instanceof WakeupException) {
                try {
                    WorkerSinkTask.this.openPartitions(partitions);
                    WorkerSinkTask.this.rewind();
                }
                catch (RuntimeException e) {
                    WorkerSinkTask.this.rebalanceException = e;
                }
            }
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            try {
                WorkerSinkTask.this.closePartitions();
            }
            catch (RuntimeException e) {
                WorkerSinkTask.this.rebalanceException = e;
            }
            WorkerSinkTask.this.messageBatch.clear();
        }
    }
}

