/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncBufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
class BufferedMutatorOverAsyncBufferedMutator
implements BufferedMutator {
    private static final Logger LOG = LoggerFactory.getLogger(BufferedMutatorOverAsyncBufferedMutator.class);
    private final AsyncBufferedMutator mutator;
    private final BufferedMutator.ExceptionListener listener;
    private final Set<CompletableFuture<Void>> futures = ConcurrentHashMap.newKeySet();
    private final AtomicLong bufferedSize = new AtomicLong(0L);
    private final ConcurrentLinkedQueue<Pair<Mutation, Throwable>> errors = new ConcurrentLinkedQueue();
    private static final Pattern ADDR_MSG_MATCHER = Pattern.compile("Call to (\\S+) failed");

    BufferedMutatorOverAsyncBufferedMutator(AsyncBufferedMutator mutator, BufferedMutator.ExceptionListener listener) {
        this.mutator = mutator;
        this.listener = listener;
    }

    @Override
    public TableName getName() {
        return this.mutator.getName();
    }

    @Override
    public Configuration getConfiguration() {
        return this.mutator.getConfiguration();
    }

    @Override
    public void mutate(Mutation mutation) throws IOException {
        this.mutate(Collections.singletonList(mutation));
    }

    private String getHostnameAndPort(Throwable error) {
        Matcher matcher = ADDR_MSG_MATCHER.matcher(error.getMessage());
        if (matcher.matches()) {
            return matcher.group(1);
        }
        return "";
    }

    private RetriesExhaustedWithDetailsException makeError() {
        Pair<Mutation, Throwable> pair;
        ArrayList<Row> rows = new ArrayList<Row>();
        ArrayList<Throwable> throwables = new ArrayList<Throwable>();
        ArrayList<String> hostnameAndPorts = new ArrayList<String>();
        while ((pair = this.errors.poll()) != null) {
            rows.add((Row)pair.getFirst());
            throwables.add((Throwable)pair.getSecond());
            hostnameAndPorts.add(this.getHostnameAndPort((Throwable)pair.getSecond()));
        }
        return new RetriesExhaustedWithDetailsException(throwables, rows, hostnameAndPorts);
    }

    private void internalFlush() throws RetriesExhaustedWithDetailsException {
        CompletableFuture[] toWait = this.futures.toArray(new CompletableFuture[0]);
        this.mutator.flush();
        try {
            CompletableFuture.allOf(toWait).join();
        }
        catch (CompletionException e) {
            LOG.debug("Flush failed, you should get an exception thrown to your code", (Throwable)e);
        }
        if (!this.errors.isEmpty()) {
            RetriesExhaustedWithDetailsException error = this.makeError();
            this.listener.onException(error, this);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void mutate(List<? extends Mutation> mutations) throws IOException {
        List<CompletableFuture<Void>> fs = this.mutator.mutate(mutations);
        int n = fs.size();
        for (int i = 0; i < n; ++i) {
            CompletableFuture toComplete = new CompletableFuture();
            this.futures.add(toComplete);
            Mutation mutation = mutations.get(i);
            long heapSize = mutation.heapSize();
            this.bufferedSize.addAndGet(heapSize);
            FutureUtils.addListener(fs.get(i), (r, e) -> {
                this.futures.remove(toComplete);
                this.bufferedSize.addAndGet(-heapSize);
                if (e != null) {
                    this.errors.add((Pair<Mutation, Throwable>)Pair.newPair((Object)mutation, (Object)e));
                    toComplete.completeExceptionally((Throwable)e);
                } else {
                    toComplete.complete(r);
                }
            });
        }
        BufferedMutatorOverAsyncBufferedMutator bufferedMutatorOverAsyncBufferedMutator = this;
        synchronized (bufferedMutatorOverAsyncBufferedMutator) {
            if (this.bufferedSize.get() > this.mutator.getWriteBufferSize() * 2L) {
                this.internalFlush();
            } else if (!this.errors.isEmpty()) {
                RetriesExhaustedWithDetailsException error = this.makeError();
                this.listener.onException(error, this);
            }
        }
    }

    @Override
    public synchronized void close() throws IOException {
        this.internalFlush();
        this.mutator.close();
    }

    @Override
    public synchronized void flush() throws IOException {
        this.internalFlush();
    }

    @Override
    public long getWriteBufferSize() {
        return this.mutator.getWriteBufferSize();
    }

    @Override
    public void setRpcTimeout(int timeout) {
    }

    @Override
    public void setOperationTimeout(int timeout) {
    }
}

