/*
 * Decompiled with CFR 0.152.
 */
package io.apicurio.registry.utils.kafka;

import io.apicurio.registry.utils.kafka.ConsumerChainedExceptionHandler;
import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerSkipRecordsSerializationExceptionHandler
extends ConsumerChainedExceptionHandler {
    private static final Logger log = LoggerFactory.getLogger(ConsumerSkipRecordsSerializationExceptionHandler.class);
    private static final Pattern EXCEPTION_PATTERN = Pattern.compile("partition (.*?)-(\\d+) at offset (\\d+)");

    public ConsumerSkipRecordsSerializationExceptionHandler() {
    }

    public ConsumerSkipRecordsSerializationExceptionHandler(BiConsumer<Consumer<?, ?>, RuntimeException> nextHandler) {
        super(nextHandler);
    }

    @Override
    public void accept(Consumer<?, ?> consumer, RuntimeException e) {
        if (e instanceof SerializationException) {
            Matcher m = EXCEPTION_PATTERN.matcher(e.getMessage());
            if (m.find()) {
                String topic = m.group(1);
                int partition = Integer.parseInt(m.group(2));
                long errorOffset = Long.parseLong(m.group(3));
                TopicPartition tp = new TopicPartition(topic, partition);
                long currentOffset = consumer.position(tp);
                log.error("SerializationException - skipping records in partition {} from offset {} up to and including error offset {}", new Object[]{tp, currentOffset, errorOffset, e});
                consumer.seek(tp, errorOffset + 1L);
            } else {
                super.accept(consumer, e);
            }
        } else {
            super.accept(consumer, e);
        }
    }
}

