/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerSinkTask;
import org.apache.kafka.connect.util.ShutdownableThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class WorkerSinkTaskThread
extends ShutdownableThread {
    private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);
    private final WorkerSinkTask task;
    private long nextCommit;
    private boolean committing;
    private int commitSeqno;
    private long commitStarted;
    private int commitFailures;

    public WorkerSinkTaskThread(WorkerSinkTask task, String name, Time time, WorkerConfig workerConfig) {
        super(name);
        this.task = task;
        this.nextCommit = time.milliseconds() + workerConfig.getLong("offset.flush.interval.ms");
        this.committing = false;
        this.commitSeqno = 0;
        this.commitStarted = -1L;
        this.commitFailures = 0;
    }

    @Override
    public void execute() {
        if (!this.task.joinConsumerGroupAndStart()) {
            return;
        }
        while (this.getRunning()) {
            this.iteration();
        }
        this.task.commitOffsets(true, -1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void iteration() {
        long now = this.task.time().milliseconds();
        if (!this.committing && now >= this.nextCommit) {
            WorkerSinkTaskThread workerSinkTaskThread = this;
            synchronized (workerSinkTaskThread) {
                this.committing = true;
                ++this.commitSeqno;
                this.commitStarted = now;
            }
            this.task.commitOffsets(false, this.commitSeqno);
            this.nextCommit += this.task.workerConfig().getLong("offset.flush.interval.ms").longValue();
        }
        long commitTimeout = this.commitStarted + this.task.workerConfig().getLong("offset.flush.timeout.ms");
        if (this.committing && now >= commitTimeout) {
            log.warn("Commit of {} offsets timed out", (Object)this);
            ++this.commitFailures;
            this.committing = false;
        }
        long timeoutMs = Math.max(this.nextCommit - now, 0L);
        this.task.poll(timeoutMs);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onCommitCompleted(Throwable error, long seqno) {
        WorkerSinkTaskThread workerSinkTaskThread = this;
        synchronized (workerSinkTaskThread) {
            if ((long)this.commitSeqno != seqno) {
                log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}", new Object[]{this, seqno, this.commitSeqno});
            } else {
                if (error != null) {
                    log.error("Commit of {} offsets threw an unexpected exception: ", (Object)this, (Object)error);
                    ++this.commitFailures;
                } else {
                    log.debug("Finished {} offset commit successfully in {} ms", (Object)this, (Object)(this.task.time().milliseconds() - this.commitStarted));
                    this.commitFailures = 0;
                }
                this.committing = false;
            }
        }
    }

    public int commitFailures() {
        return this.commitFailures;
    }
}

