/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Murmur3_32Hash;

public class ConsistentHashingStickyKeyConsumerSelector
implements StickyKeyConsumerSelector {
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final NavigableMap<Integer, List<Consumer>> hashRing = new TreeMap<Integer, List<Consumer>>();
    private final int numberOfPoints;

    public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints) {
        this.numberOfPoints = numberOfPoints;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addConsumer(Consumer consumer) throws BrokerServiceException.ConsumerAssignException {
        this.rwLock.writeLock().lock();
        try {
            for (int i = 0; i < this.numberOfPoints; ++i) {
                String key = consumer.consumerName() + i;
                int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
                this.hashRing.compute(hash, (k, v) -> {
                    if (v == null) {
                        return Lists.newArrayList(consumer);
                    }
                    if (!v.contains(consumer)) {
                        v.add(consumer);
                        v.sort(Comparator.comparing(Consumer::consumerName, String::compareTo));
                    }
                    return v;
                });
            }
        }
        finally {
            this.rwLock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeConsumer(Consumer consumer) {
        this.rwLock.writeLock().lock();
        try {
            for (int i = 0; i < this.numberOfPoints; ++i) {
                String key = consumer.consumerName() + i;
                int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
                this.hashRing.compute(hash, (k, v) -> {
                    if (v == null) {
                        return null;
                    }
                    v.removeIf(c -> c.consumerName().equals(consumer.consumerName()));
                    if (v.isEmpty()) {
                        v = null;
                    }
                    return v;
                });
            }
        }
        finally {
            this.rwLock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Consumer select(int hash) {
        this.rwLock.readLock().lock();
        try {
            if (this.hashRing.isEmpty()) {
                Consumer consumer = null;
                return consumer;
            }
            Map.Entry<Integer, List<Consumer>> ceilingEntry = this.hashRing.ceilingEntry(hash);
            List<Consumer> consumerList = ceilingEntry != null ? ceilingEntry.getValue() : this.hashRing.firstEntry().getValue();
            Consumer consumer = consumerList.get(hash % consumerList.size());
            return consumer;
        }
        finally {
            this.rwLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
        LinkedHashMap<Consumer, List<Range>> result = new LinkedHashMap<Consumer, List<Range>>();
        this.rwLock.readLock().lock();
        try {
            int start = 0;
            for (Map.Entry entry : this.hashRing.entrySet()) {
                for (Consumer consumer : (List)entry.getValue()) {
                    result.computeIfAbsent(consumer, key -> new ArrayList()).add(Range.of(start, (Integer)entry.getKey()));
                }
                start = (Integer)entry.getKey() + 1;
            }
        }
        finally {
            this.rwLock.readLock().unlock();
        }
        return result;
    }

    Map<Integer, List<Consumer>> getRangeConsumer() {
        return Collections.unmodifiableMap(this.hashRing);
    }
}

