/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.stream.impl;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.security.auth.DelegationTokenProvider;
import io.pravega.client.security.auth.DelegationTokenProviderFactory;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentOutputStream;
import io.pravega.client.segment.impl.SegmentOutputStreamFactory;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TransactionalEventStreamWriter;
import io.pravega.client.stream.TxnFailedException;
import io.pravega.client.stream.impl.Controller;
import io.pravega.client.stream.impl.Pinger;
import io.pravega.client.stream.impl.SegmentTransaction;
import io.pravega.client.stream.impl.SegmentTransactionImpl;
import io.pravega.client.stream.impl.StreamSegments;
import io.pravega.client.stream.impl.TxnSegments;
import io.pravega.common.concurrent.Futures;
import java.beans.ConstructorProperties;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionalEventStreamWriterImpl<Type>
implements TransactionalEventStreamWriter<Type> {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(TransactionalEventStreamWriterImpl.class);
    private final Stream stream;
    private final String writerId;
    private final Serializer<Type> serializer;
    private final SegmentOutputStreamFactory outputStreamFactory;
    private final Controller controller;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final EventWriterConfig config;
    private final Pinger pinger;

    TransactionalEventStreamWriterImpl(Stream stream, String writerId, Controller controller, SegmentOutputStreamFactory outputStreamFactory, Serializer<Type> serializer, EventWriterConfig config, ScheduledExecutorService executor) {
        this.stream = (Stream)Preconditions.checkNotNull((Object)stream);
        this.writerId = (String)Preconditions.checkNotNull((Object)writerId);
        this.controller = (Controller)Preconditions.checkNotNull((Object)controller);
        this.outputStreamFactory = (SegmentOutputStreamFactory)Preconditions.checkNotNull((Object)outputStreamFactory);
        this.serializer = (Serializer)Preconditions.checkNotNull(serializer);
        this.config = config;
        this.pinger = new Pinger(config, stream, controller, executor);
    }

    @Override
    public Transaction<Type> beginTxn() {
        TxnSegments txnSegments = (TxnSegments)Futures.getAndHandleExceptions(this.controller.createTransaction(this.stream, this.config.getTransactionTimeoutTime()), RuntimeException::new);
        UUID txnId = txnSegments.getTxnId();
        HashMap transactions = new HashMap();
        DelegationTokenProvider tokenProvider = null;
        for (Segment s : txnSegments.getStreamSegments().getSegments()) {
            if (tokenProvider == null) {
                tokenProvider = DelegationTokenProviderFactory.create(txnSegments.getStreamSegments().getDelegationToken(), this.controller, s);
            }
            SegmentOutputStream out = this.outputStreamFactory.createOutputStreamForTransaction(s, txnId, this.config, tokenProvider);
            SegmentTransactionImpl<Type> impl = new SegmentTransactionImpl<Type>(txnId, out, this.serializer);
            transactions.put(s, impl);
        }
        this.pinger.startPing(txnId);
        return new TransactionImpl(this.writerId, txnId, transactions, txnSegments.getStreamSegments(), this.controller, this.stream, this.pinger);
    }

    @Override
    public Transaction<Type> getTxn(UUID txId) {
        StreamSegments segments = (StreamSegments)Futures.getAndHandleExceptions(this.controller.getCurrentSegments(this.stream.getScope(), this.stream.getStreamName()), RuntimeException::new);
        Transaction.Status status = (Transaction.Status)((Object)Futures.getAndHandleExceptions(this.controller.checkTransactionStatus(this.stream, txId), RuntimeException::new));
        if (status != Transaction.Status.OPEN) {
            return new TransactionImpl(this.writerId, txId, this.controller, this.stream);
        }
        HashMap transactions = new HashMap();
        DelegationTokenProvider tokenProvider = null;
        for (Segment s : segments.getSegments()) {
            if (tokenProvider == null) {
                tokenProvider = DelegationTokenProviderFactory.create(segments.getDelegationToken(), this.controller, s);
            }
            SegmentOutputStream out = this.outputStreamFactory.createOutputStreamForTransaction(s, txId, this.config, tokenProvider);
            SegmentTransactionImpl<Type> impl = new SegmentTransactionImpl<Type>(txId, out, this.serializer);
            transactions.put(s, impl);
        }
        return new TransactionImpl(this.writerId, txId, transactions, segments, this.controller, this.stream, this.pinger);
    }

    @Override
    public void close() {
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.pinger.close();
    }

    @Override
    public EventWriterConfig getConfig() {
        return this.config;
    }

    @SuppressFBWarnings(justification="generated code")
    public String toString() {
        return "TransactionalEventStreamWriterImpl(stream=" + this.stream + ", closed=" + this.closed + ")";
    }

    private static class TransactionImpl<Type>
    implements Transaction<Type> {
        private final String writerId;
        @NonNull
        private final UUID txId;
        private final Map<Segment, SegmentTransaction<Type>> inner;
        private final StreamSegments segments;
        @NonNull
        private final Controller controller;
        @NonNull
        private final Stream stream;
        private final Pinger pinger;
        private final AtomicBoolean closed = new AtomicBoolean(false);

        TransactionImpl(String writerId, UUID txId, Controller controller, Stream stream) {
            this.writerId = writerId;
            this.txId = txId;
            this.inner = null;
            this.segments = null;
            this.controller = controller;
            this.stream = stream;
            this.pinger = null;
            this.closed.set(true);
        }

        @Override
        public void writeEvent(Type event) throws TxnFailedException {
            this.writeEvent(this.txId.toString(), event);
        }

        @Override
        public void writeEvent(String routingKey, Type event) throws TxnFailedException {
            Preconditions.checkNotNull((Object)routingKey);
            Preconditions.checkNotNull(event);
            this.throwIfClosed();
            Segment s = this.segments.getSegmentForKey(routingKey);
            SegmentTransaction<Type> transaction = this.inner.get(s);
            transaction.writeEvent(event);
        }

        @Override
        public void commit() throws TxnFailedException {
            this.throwIfClosed();
            for (SegmentTransaction<Type> tx : this.inner.values()) {
                tx.close();
            }
            Futures.getAndHandleExceptions(this.controller.commitTransaction(this.stream, this.writerId, null, this.txId), TxnFailedException::new);
            this.pinger.stopPing(this.txId);
            this.closed.set(true);
        }

        @Override
        public void commit(long timestamp) throws TxnFailedException {
            this.throwIfClosed();
            for (SegmentTransaction<Type> tx : this.inner.values()) {
                tx.close();
            }
            Futures.getAndHandleExceptions(this.controller.commitTransaction(this.stream, this.writerId, timestamp, this.txId), TxnFailedException::new);
            this.pinger.stopPing(this.txId);
            this.closed.set(true);
        }

        @Override
        public void abort() {
            if (!this.closed.get()) {
                this.pinger.stopPing(this.txId);
                for (SegmentTransaction<Type> tx : this.inner.values()) {
                    try {
                        tx.close();
                    }
                    catch (TxnFailedException e) {
                        log.debug("Got exception while writing to transaction on abort: {}", (Object)e.getMessage());
                    }
                }
                Futures.getAndHandleExceptions(this.controller.abortTransaction(this.stream, this.txId), RuntimeException::new);
                this.closed.set(true);
            }
        }

        @Override
        public Transaction.Status checkStatus() {
            return (Transaction.Status)((Object)Futures.getAndHandleExceptions(this.controller.checkTransactionStatus(this.stream, this.txId), RuntimeException::new));
        }

        @Override
        public void flush() throws TxnFailedException {
            this.throwIfClosed();
            for (SegmentTransaction<Type> tx : this.inner.values()) {
                tx.flush();
            }
        }

        @Override
        public UUID getTxnId() {
            return this.txId;
        }

        private void throwIfClosed() throws TxnFailedException {
            if (this.closed.get()) {
                throw new TxnFailedException();
            }
        }

        @ConstructorProperties(value={"writerId", "txId", "inner", "segments", "controller", "stream", "pinger"})
        @SuppressFBWarnings(justification="generated code")
        public TransactionImpl(String writerId, @NonNull UUID txId, Map<Segment, SegmentTransaction<Type>> inner, StreamSegments segments, @NonNull Controller controller, @NonNull Stream stream, Pinger pinger) {
            if (txId == null) {
                throw new NullPointerException("txId is marked @NonNull but is null");
            }
            if (controller == null) {
                throw new NullPointerException("controller is marked @NonNull but is null");
            }
            if (stream == null) {
                throw new NullPointerException("stream is marked @NonNull but is null");
            }
            this.writerId = writerId;
            this.txId = txId;
            this.inner = inner;
            this.segments = segments;
            this.controller = controller;
            this.stream = stream;
            this.pinger = pinger;
        }
    }
}

