/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.wire.ApiServiceUtils;
import com.google.cloud.pubsublite.internal.wire.BatchPublisher;
import com.google.cloud.pubsublite.internal.wire.BatchPublisherFactory;
import com.google.cloud.pubsublite.internal.wire.BatchPublisherImpl;
import com.google.cloud.pubsublite.internal.wire.RetryingConnection;
import com.google.cloud.pubsublite.internal.wire.RetryingConnectionImpl;
import com.google.cloud.pubsublite.internal.wire.RetryingConnectionObserver;
import com.google.cloud.pubsublite.internal.wire.SerialBatcher;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.cloud.pubsublite.proto.InitialPublishRequest;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.PublishRequest;
import com.google.cloud.pubsublite.proto.PublishResponse;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Monitor;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;

public final class PublisherImpl
extends ProxyService
implements Publisher<Offset>,
RetryingConnectionObserver<Offset> {
    @GuardedBy(value="monitor.monitor")
    private final RetryingConnection<BatchPublisher> connection;
    private final ScheduledExecutorService executorService;
    private final BatchingSettings batchingSettings;
    private Future<?> alarmFuture;
    private final CloseableMonitor monitor = new CloseableMonitor();
    private final Monitor.Guard noneInFlight;
    @GuardedBy(value="monitor.monitor")
    private boolean shutdown;
    @GuardedBy(value="monitor.monitor")
    private Optional<Offset> lastSentOffset;
    @GuardedBy(value="monitor.monitor")
    private final SerialBatcher batcher;
    @GuardedBy(value="monitor.monitor")
    private final Queue<InFlightBatch> batchesInFlight;

    @VisibleForTesting
    PublisherImpl(StreamFactory<PublishRequest, PublishResponse> streamFactory, BatchPublisherFactory publisherFactory, InitialPublishRequest initialRequest, BatchingSettings batchingSettings) throws ApiException {
        this.noneInFlight = new Monitor.Guard(this.monitor.monitor){

            public boolean isSatisfied() {
                return PublisherImpl.this.batchesInFlight.isEmpty() || PublisherImpl.this.shutdown;
            }
        };
        this.shutdown = false;
        this.lastSentOffset = Optional.empty();
        this.batchesInFlight = new ArrayDeque<InFlightBatch>();
        Preconditions.checkNotNull((Object)batchingSettings.getDelayThreshold());
        Preconditions.checkNotNull((Object)batchingSettings.getRequestByteThreshold());
        Preconditions.checkNotNull((Object)batchingSettings.getElementCountThreshold());
        this.connection = new RetryingConnectionImpl<PublishRequest, PublishResponse, Offset, BatchPublisher>(streamFactory, publisherFactory, PublishRequest.newBuilder().setInitialRequest(initialRequest).build(), this);
        this.batcher = new SerialBatcher(batchingSettings.getRequestByteThreshold(), batchingSettings.getElementCountThreshold());
        this.executorService = Executors.newSingleThreadScheduledExecutor();
        this.batchingSettings = batchingSettings;
        this.addServices(this.connection);
    }

    public PublisherImpl(PublisherServiceClient client, InitialPublishRequest initialRequest, BatchingSettings batchingSettings) throws ApiException {
        this(responseStream -> client.publishCallable().splitCall(responseStream), new BatchPublisherImpl.Factory(), initialRequest, batchingSettings);
        this.addServices(ApiServiceUtils.backgroundResourceAsApiService(client));
    }

    @Override
    public void triggerReinitialize() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.connection.reinitialize();
            Queue<InFlightBatch> batches = this.batchesInFlight;
            this.connection.modifyConnection(connectionOr -> {
                if (!connectionOr.isPresent()) {
                    return;
                }
                batches.forEach(batch -> ((BatchPublisher)connectionOr.get()).publish(batch.messages));
            });
        }
        catch (CheckedApiException e) {
            this.onPermanentError(e);
        }
    }

    @Override
    protected void handlePermanentError(CheckedApiException error) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.shutdown = true;
            this.terminateOutstandingPublishes(error);
        }
    }

    @Override
    protected void start() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.alarmFuture = this.executorService.scheduleWithFixedDelay(this::flushToStream, this.batchingSettings.getDelayThreshold().toNanos(), this.batchingSettings.getDelayThreshold().toNanos(), TimeUnit.NANOSECONDS);
        }
    }

    @Override
    protected void stop() {
        this.alarmFuture.cancel(false);
        this.executorService.shutdown();
        this.flush();
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.shutdown = true;
        }
        this.flush();
    }

    @GuardedBy(value="monitor.monitor")
    private void processBatch(Collection<SerialBatcher.UnbatchedMessage> batch) throws CheckedApiException {
        if (batch.isEmpty()) {
            return;
        }
        InFlightBatch inFlightBatch = new InFlightBatch(batch);
        this.batchesInFlight.add(inFlightBatch);
        this.connection.modifyConnection(connectionOr -> {
            CheckedApiPreconditions.checkState(connectionOr.isPresent(), "Published after the stream shut down.");
            ((BatchPublisher)connectionOr.get()).publish(inFlightBatch.messages);
        });
    }

    @GuardedBy(value="monitor.monitor")
    private void terminateOutstandingPublishes(CheckedApiException e) {
        this.batchesInFlight.forEach(batch -> batch.messageFutures.forEach(future -> future.setException((Throwable)e)));
        this.batcher.flush().forEach(m -> m.future().setException((Throwable)e));
        this.batchesInFlight.clear();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public ApiFuture<Offset> publish(Message message) {
        PubSubMessage proto = message.toProto();
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            ApiService.State currentState = this.state();
            CheckedApiPreconditions.checkState(currentState == ApiService.State.RUNNING, String.format("Cannot publish when Publisher state is %s.", currentState.name()));
            CheckedApiPreconditions.checkState(!this.shutdown, "Published after the stream shut down.");
            ApiFuture<Offset> messageFuture = this.batcher.add(proto);
            if (this.batcher.shouldFlush()) {
                this.processBatch(this.batcher.flush());
            }
            ApiFuture<Offset> apiFuture = messageFuture;
            return apiFuture;
        }
        catch (CheckedApiException e) {
            this.onPermanentError(e);
            return ApiFutures.immediateFailedFuture((Throwable)e);
        }
    }

    @Override
    public void cancelOutstandingPublishes() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.terminateOutstandingPublishes(new CheckedApiException("Cancelled by client.", StatusCode.Code.CANCELLED));
        }
    }

    @VisibleForTesting
    void flushToStream() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            this.processBatch(this.batcher.flush());
        }
        catch (CheckedApiException e) {
            this.onPermanentError(e);
        }
    }

    @Override
    public void flush() {
        this.flushToStream();
        CloseableMonitor.Hold h = this.monitor.enterWhenUninterruptibly(this.noneInFlight);
        Throwable throwable = null;
        if (h != null) {
            if (throwable != null) {
                try {
                    h.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
            } else {
                h.close();
            }
        }
    }

    @Override
    public void onClientResponse(Offset value) throws CheckedApiException {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            CheckedApiPreconditions.checkState(!this.batchesInFlight.isEmpty(), "Received publish response with no batches in flight.");
            if (this.lastSentOffset.isPresent() && this.lastSentOffset.get().value() >= value.value()) {
                throw new CheckedApiException(String.format("Received publish response with offset %s that is inconsistent with previous responses max %s", value, this.lastSentOffset.get()), StatusCode.Code.FAILED_PRECONDITION);
            }
            InFlightBatch batch = this.batchesInFlight.remove();
            this.lastSentOffset = Optional.of(Offset.of(value.value() + (long)batch.messages.size() - 1L));
            for (int i = 0; i < batch.messageFutures.size(); ++i) {
                Offset offset = Offset.of(value.value() + (long)i);
                batch.messageFutures.get(i).set((Object)offset);
            }
        }
    }

    private static class InFlightBatch {
        final List<PubSubMessage> messages;
        final List<SettableApiFuture<Offset>> messageFutures;

        InFlightBatch(Collection<SerialBatcher.UnbatchedMessage> toBatch) {
            this.messages = toBatch.stream().map(SerialBatcher.UnbatchedMessage::message).collect(Collectors.toList());
            this.messageFutures = toBatch.stream().map(SerialBatcher.UnbatchedMessage::future).collect(Collectors.toList());
        }
    }
}

