/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.task.Stream;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.server.SegmentHelper;
import io.pravega.controller.server.eventProcessor.ControllerEventProcessorConfig;
import io.pravega.controller.server.eventProcessor.ControllerEventProcessors;
import io.pravega.controller.server.rpc.auth.GrpcAuthHelper;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.TxnStatus;
import io.pravega.controller.store.stream.Version;
import io.pravega.controller.store.stream.VersionedTransactionData;
import io.pravega.controller.store.stream.records.RecordHelper;
import io.pravega.controller.store.stream.records.StreamSegmentRecord;
import io.pravega.controller.store.task.TxnResource;
import io.pravega.controller.stream.api.grpc.v1.Controller;
import io.pravega.controller.task.Stream.TaskStepsRetryHelper;
import io.pravega.controller.task.Stream.WriteFailedException;
import io.pravega.controller.timeout.TimeoutService;
import io.pravega.controller.timeout.TimeoutServiceConfig;
import io.pravega.controller.timeout.TimerWheelTimeoutService;
import io.pravega.controller.util.Config;
import io.pravega.controller.util.RetryHelper;
import io.pravega.shared.controller.event.AbortEvent;
import io.pravega.shared.controller.event.CommitEvent;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamTransactionMetadataTasks
implements AutoCloseable {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(StreamTransactionMetadataTasks.class);
    @SuppressFBWarnings(justification="generated code")
    private final Object $lock = new Object[0];
    private static final int MAX_EXECUTION_TIME_MULTIPLIER = 1000;
    protected final String hostId;
    protected final ScheduledExecutorService executor;
    private final StreamMetadataStore streamMetadataStore;
    private final SegmentHelper segmentHelper;
    private final GrpcAuthHelper authHelper;
    @VisibleForTesting
    private final TimeoutService timeoutService;
    private volatile boolean ready;
    private final CountDownLatch readyLatch;
    private final CompletableFuture<EventStreamWriter<CommitEvent>> commitWriterFuture;
    private final CompletableFuture<EventStreamWriter<AbortEvent>> abortWriterFuture;

    @VisibleForTesting
    public StreamTransactionMetadataTasks(StreamMetadataStore streamMetadataStore, SegmentHelper segmentHelper, ScheduledExecutorService executor, String hostId, TimeoutServiceConfig timeoutServiceConfig, BlockingQueue<Optional<Throwable>> taskCompletionQueue, GrpcAuthHelper authHelper) {
        this.hostId = hostId;
        this.executor = executor;
        this.streamMetadataStore = streamMetadataStore;
        this.segmentHelper = segmentHelper;
        this.authHelper = authHelper;
        this.timeoutService = new TimerWheelTimeoutService(this, timeoutServiceConfig, taskCompletionQueue);
        this.readyLatch = new CountDownLatch(1);
        this.commitWriterFuture = new CompletableFuture();
        this.abortWriterFuture = new CompletableFuture();
    }

    public StreamTransactionMetadataTasks(StreamMetadataStore streamMetadataStore, SegmentHelper segmentHelper, ScheduledExecutorService executor, String hostId, TimeoutServiceConfig timeoutServiceConfig, GrpcAuthHelper authHelper) {
        this(streamMetadataStore, segmentHelper, executor, hostId, timeoutServiceConfig, null, authHelper);
    }

    public StreamTransactionMetadataTasks(StreamMetadataStore streamMetadataStore, SegmentHelper segmentHelper, ScheduledExecutorService executor, String hostId, GrpcAuthHelper authHelper) {
        this(streamMetadataStore, segmentHelper, executor, hostId, TimeoutServiceConfig.defaultConfig(), authHelper);
    }

    private void setReady() {
        this.ready = true;
        this.readyLatch.countDown();
    }

    boolean isReady() {
        return this.ready;
    }

    @VisibleForTesting
    public boolean awaitInitialization(long timeout, TimeUnit timeUnit) throws InterruptedException {
        return this.readyLatch.await(timeout, timeUnit);
    }

    void awaitInitialization() throws InterruptedException {
        this.readyLatch.await();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initializeStreamWriters(EventStreamClientFactory clientFactory, ControllerEventProcessorConfig config) {
        Object object = this.$lock;
        synchronized (object) {
            if (!this.commitWriterFuture.isDone()) {
                this.commitWriterFuture.complete((EventStreamWriter<CommitEvent>)clientFactory.createEventWriter(config.getCommitStreamName(), ControllerEventProcessors.COMMIT_EVENT_SERIALIZER, EventWriterConfig.builder().build()));
            }
            if (!this.abortWriterFuture.isDone()) {
                this.abortWriterFuture.complete((EventStreamWriter<AbortEvent>)clientFactory.createEventWriter(config.getAbortStreamName(), ControllerEventProcessors.ABORT_EVENT_SERIALIZER, EventWriterConfig.builder().build()));
            }
            this.setReady();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void initializeStreamWriters(EventStreamWriter<CommitEvent> commitWriter, EventStreamWriter<AbortEvent> abortWriter) {
        Object object = this.$lock;
        synchronized (object) {
            this.commitWriterFuture.complete(commitWriter);
            this.abortWriterFuture.complete(abortWriter);
            this.setReady();
        }
    }

    public CompletableFuture<Pair<VersionedTransactionData, List<StreamSegmentRecord>>> createTxn(String scope, String stream, long lease, OperationContext contextOpt) {
        OperationContext context = this.getNonNullOperationContext(scope, stream, contextOpt);
        return this.createTxnBody(scope, stream, lease, context);
    }

    public CompletableFuture<Controller.PingTxnStatus> pingTxn(String scope, String stream, UUID txId, long lease, OperationContext contextOpt) {
        OperationContext context = this.getNonNullOperationContext(scope, stream, contextOpt);
        return this.pingTxnBody(scope, stream, txId, lease, context);
    }

    public CompletableFuture<TxnStatus> abortTxn(String scope, String stream, UUID txId, Version version, OperationContext contextOpt) {
        OperationContext context = this.getNonNullOperationContext(scope, stream, contextOpt);
        return RetryHelper.withRetriesAsync(() -> this.sealTxnBody(this.hostId, scope, stream, false, txId, version, context), RetryHelper.RETRYABLE_PREDICATE, 3, this.executor);
    }

    public CompletableFuture<TxnStatus> commitTxn(String scope, String stream, UUID txId, OperationContext contextOpt) {
        OperationContext context = this.getNonNullOperationContext(scope, stream, contextOpt);
        return RetryHelper.withRetriesAsync(() -> this.sealTxnBody(this.hostId, scope, stream, true, txId, null, "", Long.MIN_VALUE, context), RetryHelper.RETRYABLE_PREDICATE, 3, this.executor);
    }

    public CompletableFuture<TxnStatus> commitTxn(String scope, String stream, UUID txId, String writerId, long timestamp, OperationContext contextOpt) {
        OperationContext context = this.getNonNullOperationContext(scope, stream, contextOpt);
        return RetryHelper.withRetriesAsync(() -> this.sealTxnBody(this.hostId, scope, stream, true, txId, null, writerId, timestamp, context), RetryHelper.RETRYABLE_PREDICATE, 3, this.executor);
    }

    CompletableFuture<Pair<VersionedTransactionData, List<StreamSegmentRecord>>> createTxnBody(String scope, String stream, long lease, OperationContext ctx) {
        CompletableFuture<Void> validate = this.validate(lease);
        long maxExecutionPeriod = Math.min(1000L * lease, Duration.ofDays(1L).toMillis());
        return validate.thenCompose(validated -> RetryHelper.withRetriesAsync(() -> this.streamMetadataStore.generateTransactionId(scope, stream, ctx, this.executor).thenCompose(txnId -> {
            CompletableFuture<Void> addIndex = this.addTxnToIndex(scope, stream, (UUID)txnId);
            CompletableFuture<VersionedTransactionData> txnFuture = this.createTxnInStore(scope, stream, lease, ctx, maxExecutionPeriod, (UUID)txnId, addIndex);
            CompletionStage segmentsFuture = txnFuture.thenComposeAsync(txnData -> this.streamMetadataStore.getSegmentsInEpoch(scope, stream, txnData.getEpoch(), ctx, this.executor), (Executor)this.executor);
            CompletionStage notify = ((CompletableFuture)((CompletableFuture)segmentsFuture).thenComposeAsync(activeSegments -> this.notifyTxnCreation(scope, stream, (List<StreamSegmentRecord>)activeSegments, (UUID)txnId), (Executor)this.executor)).whenComplete((v, e) -> log.trace("Txn={}, notified segments stores", txnId));
            return ((CompletableFuture)((CompletableFuture)notify).whenCompleteAsync((result, ex) -> this.addTxnToTimeoutService(scope, stream, lease, maxExecutionPeriod, (UUID)txnId, txnFuture), (Executor)this.executor)).thenApplyAsync(arg_0 -> StreamTransactionMetadataTasks.lambda$null$8((CompletableFuture)segmentsFuture, txnId, txnFuture, arg_0), (Executor)this.executor);
        }), e -> {
            Throwable unwrap = Exceptions.unwrap((Throwable)e);
            return unwrap instanceof StoreException.WriteConflictException || unwrap instanceof StoreException.DataNotFoundException;
        }, 5, this.executor));
    }

    private void addTxnToTimeoutService(String scope, String stream, long lease, long maxExecutionPeriod, UUID txnId, CompletableFuture<VersionedTransactionData> txnFuture) {
        Version version = null;
        long executionExpiryTime = System.currentTimeMillis() + maxExecutionPeriod;
        if (!txnFuture.isCompletedExceptionally()) {
            version = txnFuture.join().getVersion();
            executionExpiryTime = txnFuture.join().getMaxExecutionExpiryTime();
        }
        this.timeoutService.addTxn(scope, stream, txnId, version, lease, executionExpiryTime);
        log.trace("Txn={}, added to timeout service on host={}", (Object)txnId, (Object)this.hostId);
    }

    private CompletableFuture<VersionedTransactionData> createTxnInStore(String scope, String stream, long lease, OperationContext ctx, long maxExecutionPeriod, UUID txnId, CompletableFuture<Void> addIndex) {
        return ((CompletableFuture)addIndex.thenComposeAsync(ignore -> this.streamMetadataStore.createTransaction(scope, stream, txnId, lease, maxExecutionPeriod, ctx, this.executor), (Executor)this.executor)).whenComplete((v, e) -> {
            if (e != null) {
                log.debug("Txn={}, failed creating txn in store", (Object)txnId);
            } else {
                log.debug("Txn={}, created in store", (Object)txnId);
            }
        });
    }

    private CompletableFuture<Void> addTxnToIndex(String scope, String stream, UUID txnId) {
        TxnResource resource = new TxnResource(scope, stream, txnId);
        return this.streamMetadataStore.addTxnToIndex(this.hostId, resource, null).whenComplete((v, e) -> {
            if (e != null) {
                log.debug("Txn={}, failed adding txn to host-txn index of host={}", (Object)txnId, (Object)this.hostId);
            } else {
                log.debug("Txn={}, added txn to host-txn index of host={}", (Object)txnId, (Object)this.hostId);
            }
        });
    }

    private CompletableFuture<Void> validate(long lease) {
        if (lease < Config.MIN_LEASE_VALUE) {
            return Futures.failedFuture((Throwable)new IllegalArgumentException("lease should be greater than minimum lease"));
        }
        if (lease > this.timeoutService.getMaxLeaseValue()) {
            return Futures.failedFuture((Throwable)new IllegalArgumentException("lease value too large, max value is " + this.timeoutService.getMaxLeaseValue()));
        }
        return CompletableFuture.completedFuture(null);
    }

    CompletableFuture<Controller.PingTxnStatus> pingTxnBody(String scope, String stream, UUID txnId, long lease, OperationContext ctx) {
        if (!this.timeoutService.isRunning()) {
            return CompletableFuture.completedFuture(this.createStatus(Controller.PingTxnStatus.Status.DISCONNECTED));
        }
        log.debug("Txn={}, updating txn node in store and extending lease", (Object)txnId);
        return this.fenceTxnUpdateLease(scope, stream, txnId, lease, ctx);
    }

    private Controller.PingTxnStatus createStatus(Controller.PingTxnStatus.Status status) {
        return Controller.PingTxnStatus.newBuilder().setStatus(status).build();
    }

    private CompletableFuture<Controller.PingTxnStatus> fenceTxnUpdateLease(String scope, String stream, UUID txnId, long lease, OperationContext ctx) {
        return this.streamMetadataStore.getTransactionData(scope, stream, txnId, ctx, this.executor).thenComposeAsync(txnData -> {
            TxnStatus txnStatus = txnData.getStatus();
            if (!txnStatus.equals((Object)TxnStatus.OPEN)) {
                return CompletableFuture.completedFuture(this.getPingTxnStatus(txnStatus));
            }
            if (lease > this.timeoutService.getMaxLeaseValue()) {
                return CompletableFuture.completedFuture(this.createStatus(Controller.PingTxnStatus.Status.LEASE_TOO_LARGE));
            }
            if (lease + System.currentTimeMillis() > txnData.getMaxExecutionExpiryTime()) {
                return CompletableFuture.completedFuture(this.createStatus(Controller.PingTxnStatus.Status.MAX_EXECUTION_TIME_EXCEEDED));
            }
            TxnResource resource = new TxnResource(scope, stream, txnId);
            CompletableFuture<Object> addIndex = !this.timeoutService.containsTxn(scope, stream, txnId) ? this.streamMetadataStore.addTxnToIndex(this.hostId, resource, txnData.getVersion()) : CompletableFuture.completedFuture(null);
            addIndex.whenComplete((v, e) -> {
                if (e != null) {
                    log.debug("Txn={}, failed adding txn to host-txn index of host={}", (Object)txnId, (Object)this.hostId);
                } else {
                    log.debug("Txn={}, added txn to host-txn index of host={}", (Object)txnId, (Object)this.hostId);
                }
            });
            return addIndex.thenComposeAsync(x -> {
                CompletionStage pingTxn = this.streamMetadataStore.pingTransaction(scope, stream, (VersionedTransactionData)txnData, lease, ctx, this.executor).whenComplete((v, e) -> {
                    if (e != null) {
                        log.debug("Txn={}, failed updating txn node in store", (Object)txnId);
                    } else {
                        log.debug("Txn={}, updated txn node in store", (Object)txnId);
                    }
                });
                return ((CompletableFuture)pingTxn).thenApplyAsync(data -> {
                    Version version = data.getVersion();
                    long expiryTime = data.getMaxExecutionExpiryTime();
                    if (this.timeoutService.containsTxn(scope, stream, txnId)) {
                        log.debug("Txn={}, extending lease in timeout service", (Object)txnId);
                        this.timeoutService.pingTxn(scope, stream, txnId, version, lease);
                    } else {
                        log.debug("Txn={}, adding in timeout service", (Object)txnId);
                        this.timeoutService.addTxn(scope, stream, txnId, version, lease, expiryTime);
                    }
                    return this.createStatus(Controller.PingTxnStatus.Status.OK);
                }, (Executor)this.executor);
            }, (Executor)this.executor);
        }, (Executor)this.executor);
    }

    private Controller.PingTxnStatus getPingTxnStatus(TxnStatus txnStatus) {
        Controller.PingTxnStatus status = txnStatus.equals((Object)TxnStatus.COMMITTED) || txnStatus.equals((Object)TxnStatus.COMMITTING) ? this.createStatus(Controller.PingTxnStatus.Status.COMMITTED) : (txnStatus.equals((Object)TxnStatus.ABORTED) || txnStatus.equals((Object)TxnStatus.ABORTING) ? this.createStatus(Controller.PingTxnStatus.Status.ABORTED) : this.createStatus(Controller.PingTxnStatus.Status.UNKNOWN));
        return status;
    }

    CompletableFuture<TxnStatus> sealTxnBody(String host, String scope, String stream, boolean commit, UUID txnId, Version version, OperationContext ctx) {
        return this.sealTxnBody(host, scope, stream, commit, txnId, version, "", Long.MIN_VALUE, ctx);
    }

    CompletableFuture<TxnStatus> sealTxnBody(String host, String scope, String stream, boolean commit, UUID txnId, Version version, String writerId, long timestamp, OperationContext ctx) {
        TxnResource resource = new TxnResource(scope, stream, txnId);
        Optional<Version> versionOpt = Optional.ofNullable(version);
        CompletableFuture<Object> addIndex = host.equals(this.hostId) && !this.timeoutService.containsTxn(scope, stream, txnId) ? this.streamMetadataStore.addTxnToIndex(this.hostId, resource, version) : CompletableFuture.completedFuture(null);
        addIndex.whenComplete((v, e) -> {
            if (e != null) {
                log.debug("Txn={}, already present/newly added to host-txn index of host={}", (Object)txnId, (Object)this.hostId);
            } else {
                log.debug("Txn={}, added txn to host-txn index of host={}", (Object)txnId, (Object)this.hostId);
            }
        });
        CompletionStage sealFuture = ((CompletableFuture)addIndex.thenComposeAsync(x -> this.streamMetadataStore.sealTransaction(scope, stream, txnId, commit, versionOpt, writerId, timestamp, ctx, this.executor), (Executor)this.executor)).whenComplete((v, e) -> {
            if (e != null) {
                log.debug("Txn={}, failed sealing txn", (Object)txnId);
            } else {
                log.debug("Txn={}, sealed successfully, commit={}", (Object)txnId, (Object)commit);
            }
        });
        return ((CompletableFuture)((CompletableFuture)sealFuture).thenComposeAsync(pair -> {
            TxnStatus status = (TxnStatus)((Object)((Object)pair.getKey()));
            switch (status) {
                case COMMITTING: {
                    return this.writeCommitEvent(scope, stream, (Integer)pair.getValue(), txnId, status);
                }
                case ABORTING: {
                    return this.writeAbortEvent(scope, stream, (Integer)pair.getValue(), txnId, status);
                }
                case ABORTED: 
                case COMMITTED: {
                    return CompletableFuture.completedFuture(status);
                }
            }
            return CompletableFuture.completedFuture(status);
        }, (Executor)this.executor)).thenComposeAsync(status -> {
            this.timeoutService.removeTxn(scope, stream, txnId);
            log.debug("Txn={}, removed from timeout service", (Object)txnId);
            return ((CompletableFuture)this.streamMetadataStore.removeTxnFromIndex(host, resource, true).whenComplete((v, e) -> {
                if (e != null) {
                    log.debug("Txn={}, failed removing txn from host-txn index of host={}", (Object)txnId, (Object)this.hostId);
                } else {
                    log.debug("Txn={}, removed txn from host-txn index of host={}", (Object)txnId, (Object)this.hostId);
                }
            })).thenApply(x -> status);
        }, (Executor)this.executor);
    }

    public CompletableFuture<Void> writeCommitEvent(CommitEvent event) {
        return this.commitWriterFuture.thenCompose(commitWriter -> commitWriter.writeEvent(event.getKey(), (Object)event));
    }

    CompletableFuture<TxnStatus> writeCommitEvent(String scope, String stream, int epoch, UUID txnId, TxnStatus status) {
        CommitEvent event = new CommitEvent(scope, stream, epoch);
        return TaskStepsRetryHelper.withRetries(() -> this.writeCommitEvent(event).handle((r, e) -> {
            if (e != null) {
                log.debug("Transaction {}, failed posting commit event. Retrying...", (Object)txnId);
                throw new WriteFailedException((Throwable)e);
            }
            log.debug("Transaction {} commit event posted", (Object)txnId);
            return status;
        }), this.executor);
    }

    public CompletableFuture<Void> writeAbortEvent(AbortEvent event) {
        return this.abortWriterFuture.thenCompose(abortWriter -> abortWriter.writeEvent(event.getKey(), (Object)event));
    }

    CompletableFuture<TxnStatus> writeAbortEvent(String scope, String stream, int epoch, UUID txnId, TxnStatus status) {
        AbortEvent event = new AbortEvent(scope, stream, epoch, txnId);
        return TaskStepsRetryHelper.withRetries(() -> this.writeAbortEvent(event).handle((r, e) -> {
            if (e != null) {
                log.debug("Transaction {}, failed posting abort event. Retrying...", (Object)txnId);
                throw new WriteFailedException((Throwable)e);
            }
            log.debug("Transaction {} abort event posted", (Object)txnId);
            return status;
        }), this.executor);
    }

    private CompletableFuture<Void> notifyTxnCreation(String scope, String stream, List<StreamSegmentRecord> segments, UUID txnId) {
        return Futures.allOf((Collection)((Stream)segments.stream().parallel()).map(segment -> this.notifyTxnCreation(scope, stream, segment.segmentId(), txnId)).collect(Collectors.toList()));
    }

    private CompletableFuture<Void> notifyTxnCreation(String scope, String stream, long segmentId, UUID txnId) {
        return TaskStepsRetryHelper.withRetries(() -> this.segmentHelper.createTransaction(scope, stream, segmentId, txnId, this.retrieveDelegationToken()), this.executor);
    }

    private OperationContext getNonNullOperationContext(String scope, String stream, OperationContext contextOpt) {
        return contextOpt == null ? this.streamMetadataStore.createContext(scope, stream) : contextOpt;
    }

    public String retrieveDelegationToken() {
        return this.authHelper.retrieveMasterToken();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws Exception {
        Object object = this.$lock;
        synchronized (object) {
            this.timeoutService.stopAsync();
            this.timeoutService.awaitTerminated();
            CompletionStage commitCloseFuture = this.commitWriterFuture.thenAccept(EventStreamWriter::close);
            CompletionStage abortCloseFuture = this.abortWriterFuture.thenAccept(EventStreamWriter::close);
            if (this.commitWriterFuture.isDone()) {
                ((CompletableFuture)commitCloseFuture).join();
            } else {
                this.commitWriterFuture.cancel(true);
            }
            if (this.abortWriterFuture.isDone()) {
                ((CompletableFuture)abortCloseFuture).join();
            } else {
                this.abortWriterFuture.cancel(true);
            }
        }
    }

    @SuppressFBWarnings(justification="generated code")
    public TimeoutService getTimeoutService() {
        return this.timeoutService;
    }

    private static /* synthetic */ Pair lambda$null$8(CompletableFuture segmentsFuture, UUID txnId, CompletableFuture txnFuture, Void v) {
        List segments = ((List)segmentsFuture.join()).stream().map(x -> {
            long generalizedSegmentId = RecordHelper.generalizedSegmentId(x.segmentId(), txnId);
            int epoch = StreamSegmentNameUtils.getEpoch((long)generalizedSegmentId);
            int segmentNumber = StreamSegmentNameUtils.getSegmentNumber((long)generalizedSegmentId);
            return StreamSegmentRecord.builder().creationEpoch(epoch).segmentNumber(segmentNumber).creationTime(x.getCreationTime()).keyStart(x.getKeyStart()).keyEnd(x.getKeyEnd()).build();
        }).collect(Collectors.toList());
        return new ImmutablePair(txnFuture.join(), segments);
    }
}

