/*
 * Decompiled with CFR 0.152.
 */
package com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.async;

import com.google.bigtable.repackaged.com.google.api.client.util.NanoClock;
import com.google.bigtable.repackaged.com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.repackaged.com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.repackaged.com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.repackaged.com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.config.BulkOptions;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.config.Logger;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.BigtableDataClient;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.BigtableTableName;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.async.OperationAccountant;
import com.google.bigtable.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.bigtable.repackaged.com.google.common.base.Preconditions;
import com.google.bigtable.repackaged.com.google.common.util.concurrent.FutureCallback;
import com.google.bigtable.repackaged.com.google.common.util.concurrent.Futures;
import com.google.bigtable.repackaged.com.google.common.util.concurrent.ListenableFuture;
import com.google.bigtable.repackaged.com.google.common.util.concurrent.SettableFuture;
import com.google.bigtable.repackaged.com.google.protobuf.Any;
import com.google.bigtable.repackaged.io.grpc.Status;
import com.google.bigtable.repackaged.io.grpc.StatusRuntimeException;
import com.google.cloud.bigtable.metrics.BigtableClientMetrics;
import com.google.cloud.bigtable.metrics.Meter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class BulkMutation {
    private static final StatusRuntimeException MISSING_ENTRY_EXCEPTION = Status.INTERNAL.withDescription("Mutation does not have a status").asRuntimeException();
    @VisibleForTesting
    static Logger LOG = new Logger(BulkMutation.class);
    public static final long MAX_RPC_WAIT_TIME_NANOS = TimeUnit.MINUTES.toNanos(12L);
    @VisibleForTesting
    Batch currentBatch = null;
    private ScheduledFuture<?> scheduledFlush = null;
    private final String tableName;
    private final BigtableDataClient client;
    private final OperationAccountant operationAccountant;
    private final ScheduledExecutorService retryExecutorService;
    private final int maxRowKeyCount;
    private final long maxRequestSize;
    private final long autoflushMs;
    private final Meter batchMeter = BigtableClientMetrics.meter(BigtableClientMetrics.MetricLevel.Info, "bulk-mutator.batch.meter");
    @VisibleForTesting
    NanoClock clock = NanoClock.SYSTEM;

    private static StatusRuntimeException toException(com.google.bigtable.repackaged.com.google.rpc.Status status) {
        Status grpcStatus = Status.fromCodeValue(status.getCode()).withDescription(status.getMessage());
        for (Any detail : status.getDetailsList()) {
            grpcStatus = grpcStatus.augmentDescription(detail.toString());
        }
        return grpcStatus.asRuntimeException();
    }

    private static MutateRowsRequest.Entry convert(MutateRowRequest request) {
        if (request == null) {
            return null;
        }
        return MutateRowsRequest.Entry.newBuilder().setRowKey(request.getRowKey()).addAllMutations(request.getMutationsList()).build();
    }

    private static Set<Integer> getIndexes(List<MutateRowsResponse.Entry> entries) {
        HashSet<Integer> indexes = new HashSet<Integer>(entries.size());
        for (MutateRowsResponse.Entry entry : entries) {
            indexes.add((int)entry.getIndex());
        }
        return indexes;
    }

    private static void cancelIfNotDone(Future<?> future) {
        if (future != null && !future.isDone()) {
            future.cancel(true);
        }
    }

    public BulkMutation(BigtableTableName tableName, BigtableDataClient client, ScheduledExecutorService retryExecutorService, BulkOptions bulkOptions) {
        this(tableName, client, new OperationAccountant(), retryExecutorService, bulkOptions);
    }

    BulkMutation(BigtableTableName tableName, BigtableDataClient client, OperationAccountant operationAccountant, ScheduledExecutorService retryExecutorService, BulkOptions bulkOptions) {
        this.tableName = tableName.toString();
        this.client = client;
        this.retryExecutorService = retryExecutorService;
        this.operationAccountant = operationAccountant;
        this.maxRowKeyCount = bulkOptions.getBulkMaxRowKeyCount();
        this.maxRequestSize = bulkOptions.getBulkMaxRequestSize();
        this.autoflushMs = bulkOptions.getAutoflushMs();
    }

    public ListenableFuture<MutateRowResponse> add(MutateRowRequest request) {
        return this.add(BulkMutation.convert(request));
    }

    public synchronized ListenableFuture<MutateRowResponse> add(MutateRowsRequest.Entry entry) {
        Preconditions.checkNotNull(entry, "Request null");
        Preconditions.checkArgument(!entry.getRowKey().isEmpty(), "Request has an empty rowkey");
        if (this.currentBatch == null) {
            this.batchMeter.mark();
            this.currentBatch = new Batch();
        }
        ListenableFuture future = this.currentBatch.add(entry);
        if (this.currentBatch.isFull()) {
            this.send();
            if (this.scheduledFlush != null) {
                this.scheduledFlush.cancel(true);
                this.scheduledFlush = null;
            }
        } else if (this.autoflushMs > 0L && this.currentBatch != null && this.scheduledFlush == null) {
            this.scheduledFlush = this.retryExecutorService.schedule(new Runnable(){

                @Override
                public void run() {
                    BulkMutation.this.scheduledFlush = null;
                    BulkMutation.this.send();
                }
            }, this.autoflushMs, TimeUnit.MILLISECONDS);
        }
        return future;
    }

    public void flush() throws InterruptedException {
        this.send();
        this.operationAccountant.awaitCompletion();
    }

    @VisibleForTesting
    synchronized void send() {
        if (this.currentBatch != null) {
            try {
                this.currentBatch.run();
            }
            finally {
                this.operationAccountant.registerOperation(this.currentBatch.completionFuture);
            }
            this.currentBatch = null;
        }
    }

    public boolean isFlushed() {
        return this.currentBatch == null;
    }

    static /* synthetic */ String access$000(BulkMutation x0) {
        return x0.tableName;
    }

    @VisibleForTesting
    class Batch {
        private final Meter mutationMeter = BigtableClientMetrics.meter(BigtableClientMetrics.MetricLevel.Info, "bulk-mutator.mutations.added");
        private final SettableFuture<String> completionFuture = SettableFuture.create();
        private final List<SettableFuture<MutateRowResponse>> futures = new ArrayList<SettableFuture<MutateRowResponse>>();
        private final MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder().setTableName(BulkMutation.access$000(BulkMutation.this));
        private ListenableFuture<List<MutateRowsResponse>> mutateRowsFuture;
        private ScheduledFuture<?> stalenessFuture;
        private long approximateByteSize = 0L;
        @VisibleForTesting
        Long lastRpcSentTimeNanos = null;

        Batch() {
        }

        private ListenableFuture<MutateRowResponse> add(MutateRowsRequest.Entry entry) {
            Preconditions.checkNotNull(entry);
            SettableFuture<MutateRowResponse> future = SettableFuture.create();
            this.mutationMeter.mark();
            this.futures.add(future);
            this.builder.addEntries(entry);
            this.approximateByteSize += (long)entry.getSerializedSize();
            return future;
        }

        private boolean isFull() {
            Preconditions.checkNotNull(this.futures.isEmpty());
            return this.getRequestCount() >= BulkMutation.this.maxRowKeyCount || this.approximateByteSize >= BulkMutation.this.maxRequestSize;
        }

        private void addCallback(ListenableFuture<List<MutateRowsResponse>> bulkFuture) {
            Futures.addCallback(bulkFuture, new FutureCallback<List<MutateRowsResponse>>(){

                @Override
                public void onSuccess(List<MutateRowsResponse> result) {
                    Batch.this.handleResult(result);
                }

                @Override
                public void onFailure(Throwable t) {
                    Batch.this.setFailure(t);
                }
            });
        }

        @VisibleForTesting
        synchronized void handleResult(List<MutateRowsResponse> results) {
            if (this.futures.isEmpty()) {
                LOG.warn("Got duplicate responses for bulk mutation.", new Object[0]);
                this.setComplete();
                return;
            }
            ArrayList<MutateRowsResponse.Entry> entries = new ArrayList<MutateRowsResponse.Entry>();
            for (MutateRowsResponse response : results) {
                entries.addAll(response.getEntriesList());
            }
            if (entries.isEmpty()) {
                this.setFailure(Status.INTERNAL.withDescription("No MutateRowsResponses entries were found.").asRuntimeException());
                return;
            }
            try {
                this.handleEntries(entries);
                this.handleExtraFutures(entries);
                this.setComplete();
            }
            catch (Throwable e) {
                this.setFailure(e);
            }
        }

        private void handleEntries(Iterable<MutateRowsResponse.Entry> entries) {
            for (MutateRowsResponse.Entry entry : entries) {
                int index = (int)entry.getIndex();
                if (index >= this.getRequestCount()) {
                    LOG.error("Got extra status: %s", entry);
                    continue;
                }
                SettableFuture<MutateRowResponse> future = this.futures.get(index);
                if (future == null) {
                    LOG.warn("Could not find a future for index %d.", index);
                    continue;
                }
                com.google.bigtable.repackaged.com.google.rpc.Status status = entry.getStatus();
                if (status.getCode() == Status.Code.OK.value()) {
                    future.set(MutateRowResponse.getDefaultInstance());
                    continue;
                }
                future.setException(BulkMutation.toException(status));
            }
        }

        private void handleExtraFutures(List<MutateRowsResponse.Entry> entries) {
            Set indexes = BulkMutation.getIndexes(entries);
            long missingEntriesCount = 0L;
            for (int i = 0; i < this.getRequestCount(); ++i) {
                if (indexes.remove(i)) continue;
                ++missingEntriesCount;
                this.futures.get(i).setException(MISSING_ENTRY_EXCEPTION);
            }
            if (missingEntriesCount > 0L) {
                LOG.error("Missing %d responses for bulkWrite. Setting exceptions on the futures.", missingEntriesCount);
            }
        }

        private void run() {
            if (this.futures.isEmpty()) {
                this.setComplete();
                return;
            }
            Preconditions.checkState(!this.completionFuture.isDone(), "The Batch was already run");
            try {
                MutateRowsRequest request = this.builder.build();
                this.mutateRowsFuture = BulkMutation.this.client.mutateRowsAsync(request);
                this.lastRpcSentTimeNanos = BulkMutation.this.clock.nanoTime();
            }
            catch (Throwable e) {
                this.mutateRowsFuture = Futures.immediateFailedFuture(e);
            }
            finally {
                this.addCallback(this.mutateRowsFuture);
            }
            this.setupStalenessChecker();
        }

        private void setupStalenessChecker() {
            Runnable runnable = new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    Batch batch = Batch.this;
                    synchronized (batch) {
                        if (Batch.this.futures.isEmpty()) {
                            Batch.this.setComplete();
                        } else if (Batch.this.isStale()) {
                            Batch.this.setFailure(Status.INTERNAL.withDescription("Stale requests.").asRuntimeException());
                        } else {
                            Batch.this.setupStalenessChecker();
                        }
                    }
                }
            };
            if (this.lastRpcSentTimeNanos != null) {
                long delay = this.calculateTimeUntilStaleNanos();
                this.stalenessFuture = BulkMutation.this.retryExecutorService.schedule(runnable, delay, TimeUnit.NANOSECONDS);
            }
        }

        private long calculateTimeUntilStaleNanos() {
            return this.lastRpcSentTimeNanos + MAX_RPC_WAIT_TIME_NANOS - BulkMutation.this.clock.nanoTime();
        }

        @VisibleForTesting
        boolean isStale() {
            return this.lastRpcSentTimeNanos != null && this.calculateTimeUntilStaleNanos() <= 0L;
        }

        @VisibleForTesting
        boolean wasSent() {
            return this.lastRpcSentTimeNanos != null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void setFailure(Throwable t) {
            try {
                for (SettableFuture<MutateRowResponse> future : this.futures) {
                    future.setException(t);
                }
            }
            finally {
                this.setComplete();
            }
        }

        private synchronized void setComplete() {
            BulkMutation.cancelIfNotDone(this.stalenessFuture);
            BulkMutation.cancelIfNotDone(this.mutateRowsFuture);
            this.mutateRowsFuture = null;
            for (SettableFuture<MutateRowResponse> future : this.futures) {
                if (future.isDone()) continue;
                future.setException(MISSING_ENTRY_EXCEPTION);
            }
            this.futures.clear();
            if (!this.completionFuture.isDone()) {
                this.completionFuture.set("");
            }
        }

        @VisibleForTesting
        int getRequestCount() {
            return this.futures.size();
        }
    }
}

