/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka;

import java.util.Collection;
import java.util.Collections;
import org.apache.camel.component.kafka.DefaultKafkaManualCommit;
import org.apache.camel.component.kafka.KafkaAsyncManualCommit;
import org.apache.camel.spi.StateRepository;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultKafkaManualAsyncCommit
extends DefaultKafkaManualCommit
implements KafkaAsyncManualCommit {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaManualAsyncCommit.class);
    private final Collection<KafkaAsyncManualCommit> asyncCommits;

    public DefaultKafkaManualAsyncCommit(KafkaConsumer consumer, String topicName, String threadId, StateRepository<String, String> offsetRepository, TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits) {
        super(consumer, topicName, threadId, offsetRepository, partition, recordOffset, commitTimeout);
        this.asyncCommits = asyncCommits;
        LOG.debug("Using commit timeout of {}", (Object)commitTimeout);
    }

    @Override
    public void commit() {
        this.asyncCommits.add(this);
    }

    @Override
    public void processAsyncCommit() {
        this.commitAsyncOffset(this.getOffsetRepository(), this.getPartition(), this.getRecordOffset());
    }

    protected void commitAsyncOffset(StateRepository<String, String> offsetRepository, TopicPartition partition, long recordOffset) {
        if (recordOffset != -1L) {
            if (offsetRepository != null) {
                offsetRepository.setState((Object)this.serializeOffsetKey(partition), (Object)this.serializeOffsetValue(recordOffset));
            } else {
                LOG.debug("CommitAsync {} from topic {} with offset: {}", new Object[]{this.getThreadId(), this.getTopicName(), recordOffset});
                this.getConsumer().commitAsync(Collections.singletonMap(partition, new OffsetAndMetadata(recordOffset + 1L)), (offsets, exception) -> {
                    if (exception != null) {
                        LOG.error("Error during async commit for {} from topic {} with offset {}: ", new Object[]{this.getThreadId(), this.getTopicName(), recordOffset, exception});
                    } else {
                        LOG.debug("CommitAsync done for {} from topic {} with offset: {}", new Object[]{this.getThreadId(), this.getTopicName(), recordOffset});
                    }
                });
            }
        }
    }
}

