/*
 * Decompiled with CFR 0.152.
 */
package io.apicurio.registry.utils.kafka;

import io.apicurio.registry.utils.kafka.Seek;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface ConsumerActions<K, V>
extends AutoCloseable {
    public <R> CompletableFuture<R> submit(Function<? super Consumer<K, V>, ? extends R> var1);

    default public void start() {
    }

    default public boolean isRunning() {
        return true;
    }

    default public void stop() {
    }

    public static interface DynamicAssignment<K, V>
    extends ConsumerActions<K, V> {
        default public CompletableFuture<Void> addTopicPartition(TopicPartition tp, Seek.Offset seekOffset) {
            Objects.requireNonNull(tp);
            Objects.requireNonNull(seekOffset);
            Logger log = LoggerFactory.getLogger(DynamicAssignment.class);
            log.info("Adding: topic-partition: {} with {}", (Object)tp, (Object)seekOffset);
            return this.submit(consumer -> {
                Set oldTps = consumer.assignment();
                HashSet<TopicPartition> newTps = new HashSet<TopicPartition>(oldTps);
                newTps.add(tp);
                if (!oldTps.equals(newTps)) {
                    log.info("Reassigning topic-partition(s): {} -> {}", (Object)oldTps, newTps);
                    consumer.assign(newTps);
                }
                seekOffset.accept((Consumer<?, ?>)consumer, tp);
                return null;
            });
        }

        default public CompletableFuture<Void> removeTopicParition(TopicPartition tp) {
            Objects.requireNonNull(tp);
            Logger log = LoggerFactory.getLogger(DynamicAssignment.class);
            log.info("Removing topic-partition: {}", (Object)tp);
            return this.submit(consumer -> {
                Set oldTps = consumer.assignment();
                HashSet newTps = new HashSet(oldTps);
                newTps.remove(tp);
                if (!oldTps.equals(newTps)) {
                    log.info("Reassigning topic-partition(s): {} -> {}", (Object)oldTps, newTps);
                    consumer.assign(newTps);
                }
                return null;
            });
        }
    }
}

