/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.connectors.flink;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TransactionalEventStreamWriter;
import io.pravega.common.Exceptions;
import io.pravega.connectors.flink.AbstractStreamingWriterBuilder;
import io.pravega.connectors.flink.PravegaEventRouter;
import io.pravega.connectors.flink.PravegaWriterMode;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkPravegaWriter<T>
extends RichSinkFunction<T>
implements ListCheckpointed<PendingTransaction>,
CheckpointListener {
    private static final Logger log = LoggerFactory.getLogger(FlinkPravegaWriter.class);
    private static final long serialVersionUID = 1L;
    private static final String PRAVEGA_WRITER_METRICS_GROUP = "PravegaWriter";
    private static final String SCOPED_STREAM_METRICS_GAUGE = "stream";
    final boolean enableMetrics;
    @VisibleForTesting
    transient AbstractInternalWriter writer = null;
    final ClientConfig clientConfig;
    final SerializationSchema<T> serializationSchema;
    final PravegaEventRouter<T> eventRouter;
    @SuppressFBWarnings(value={"SE_BAD_FIELD"})
    final Stream stream;
    private final long txnLeaseRenewalPeriod;
    private final PravegaWriterMode writerMode;
    private final boolean enableWatermark;
    private final String writerIdPrefix;
    private transient EventStreamClientFactory clientFactory = null;

    protected FlinkPravegaWriter(ClientConfig clientConfig, Stream stream, SerializationSchema<T> serializationSchema, PravegaEventRouter<T> eventRouter, PravegaWriterMode writerMode, long txnLeaseRenewalPeriod, boolean enableWatermark, boolean enableMetrics) {
        this.clientConfig = (ClientConfig)Preconditions.checkNotNull((Object)clientConfig, (String)"clientConfig");
        this.stream = (Stream)Preconditions.checkNotNull((Object)stream, (String)SCOPED_STREAM_METRICS_GAUGE);
        this.serializationSchema = (SerializationSchema)Preconditions.checkNotNull(serializationSchema, (String)"serializationSchema");
        this.eventRouter = eventRouter;
        this.writerMode = (PravegaWriterMode)Preconditions.checkNotNull((Object)writerMode, (String)"writerMode");
        Preconditions.checkArgument((txnLeaseRenewalPeriod > 0L ? 1 : 0) != 0, (Object)"txnLeaseRenewalPeriod must be > 0");
        this.txnLeaseRenewalPeriod = txnLeaseRenewalPeriod;
        this.enableWatermark = enableWatermark;
        this.enableMetrics = enableMetrics;
        this.writerIdPrefix = UUID.randomUUID().toString();
    }

    public PravegaEventRouter<T> getEventRouter() {
        return this.eventRouter;
    }

    PravegaWriterMode getPravegaWriterMode() {
        return this.writerMode;
    }

    boolean getEnableWatermark() {
        return this.enableWatermark;
    }

    public void open(Configuration parameters) throws Exception {
        this.initializeInternalWriter();
        this.writer.open();
        log.info("Initialized Pravega writer {} for stream: {} with controller URI: {}", new Object[]{this.writerId(), this.stream, this.clientConfig.getControllerURI()});
        if (this.enableMetrics) {
            this.registerMetrics();
        }
    }

    public void invoke(T event, SinkFunction.Context context) throws Exception {
        this.writer.write(event, context, this.enableWatermark);
    }

    public void close() throws Exception {
        Exception exception = null;
        if (this.writer != null) {
            try {
                this.writer.close();
            }
            catch (Exception e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
            }
        }
        if (this.clientFactory != null) {
            try {
                this.clientFactory.close();
            }
            catch (Exception e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
            }
        }
        if (exception != null) {
            throw exception;
        }
    }

    private void registerMetrics() {
        MetricGroup pravegaWriterMetricGroup = this.getRuntimeContext().getMetricGroup().addGroup(PRAVEGA_WRITER_METRICS_GROUP);
        pravegaWriterMetricGroup.gauge(SCOPED_STREAM_METRICS_GAUGE, (Gauge)new StreamNameGauge(this.stream.getScopedName()));
    }

    public List<PendingTransaction> snapshotState(long checkpointId, long checkpointTime) throws Exception {
        return this.writer.snapshotState(checkpointId, checkpointTime, this.enableWatermark);
    }

    public void restoreState(List<PendingTransaction> pendingTransactionList) throws Exception {
        this.initializeInternalWriter();
        this.writer.restoreState(pendingTransactionList);
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.writer.notifyCheckpointComplete(checkpointId);
    }

    @VisibleForTesting
    protected EventStreamClientFactory createClientFactory(String scopeName, ClientConfig clientConfig) {
        return EventStreamClientFactory.withScope(scopeName, clientConfig);
    }

    @VisibleForTesting
    protected AbstractInternalWriter createInternalWriter() {
        Preconditions.checkState((this.clientFactory != null ? 1 : 0) != 0, (Object)"clientFactory not initialized");
        if (this.writerMode == PravegaWriterMode.EXACTLY_ONCE) {
            return new TransactionalWriter(this.clientFactory);
        }
        ExecutorService executorService = this.createExecutorService();
        return new NonTransactionalWriter(this.clientFactory, executorService);
    }

    @VisibleForTesting
    protected ExecutorService createExecutorService() {
        return Executors.newSingleThreadExecutor();
    }

    private void initializeInternalWriter() {
        if (this.writer != null) {
            return;
        }
        if (this.writerMode == PravegaWriterMode.EXACTLY_ONCE && !this.isCheckpointEnabled()) {
            throw new UnsupportedOperationException("Enable checkpointing to use the exactly-once writer mode.");
        }
        this.clientFactory = this.createClientFactory(this.stream.getScope(), this.clientConfig);
        this.writer = this.createInternalWriter();
    }

    private boolean isCheckpointEnabled() {
        return ((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled();
    }

    protected String writerId() {
        return this.writerIdPrefix + "-" + this.getRuntimeContext().getIndexOfThisSubtask();
    }

    public static <T> Builder<T> builder() {
        return new Builder();
    }

    public static class Builder<T>
    extends AbstractStreamingWriterBuilder<T, Builder<T>> {
        private SerializationSchema<T> serializationSchema;
        private PravegaEventRouter<T> eventRouter;

        @Override
        protected Builder<T> builder() {
            return this;
        }

        public Builder<T> withSerializationSchema(SerializationSchema<T> serializationSchema) {
            this.serializationSchema = serializationSchema;
            return this.builder();
        }

        public Builder<T> withEventRouter(PravegaEventRouter<T> eventRouter) {
            this.eventRouter = eventRouter;
            return this.builder();
        }

        public FlinkPravegaWriter<T> build() {
            Preconditions.checkState((this.serializationSchema != null ? 1 : 0) != 0, (Object)"Serialization schema must be supplied.");
            return this.createSinkFunction(this.serializationSchema, this.eventRouter);
        }
    }

    static class PendingTransaction
    implements Serializable {
        private final UUID uuid;
        private final String scope;
        private final String stream;
        private final Long watermark;

        public PendingTransaction(UUID uuid, String scope, String stream, Long watermark) {
            Preconditions.checkNotNull((Object)uuid, (String)"UUID");
            Preconditions.checkNotNull((Object)scope, (String)"scope");
            Preconditions.checkNotNull((Object)stream, (String)FlinkPravegaWriter.SCOPED_STREAM_METRICS_GAUGE);
            this.uuid = uuid;
            this.scope = scope;
            this.stream = stream;
            this.watermark = watermark;
        }

        public UUID getUuid() {
            return this.uuid;
        }

        public String getScope() {
            return this.scope;
        }

        public String getStream() {
            return this.stream;
        }

        public Long getWatermark() {
            return this.watermark;
        }
    }

    @VisibleForTesting
    class NonTransactionalWriter
    extends AbstractInternalWriter {
        @VisibleForTesting
        final AtomicReference<Throwable> writeError;
        @VisibleForTesting
        final AtomicInteger pendingWritesCount;
        private final ExecutorService executorService;

        NonTransactionalWriter(EventStreamClientFactory clientFactory, ExecutorService executorService) {
            super(clientFactory, false);
            this.writeError = new AtomicReference<Object>(null);
            this.pendingWritesCount = new AtomicInteger(0);
            this.executorService = executorService;
        }

        @Override
        public void open() throws Exception {
        }

        @Override
        public void write(T event, SinkFunction.Context context, boolean enableWatermark) throws Exception {
            this.checkWriteError();
            this.pendingWritesCount.incrementAndGet();
            CompletableFuture<Void> future = FlinkPravegaWriter.this.eventRouter != null ? this.getPravegaWriter().writeEvent(FlinkPravegaWriter.this.eventRouter.getRoutingKey(event), event) : this.getPravegaWriter().writeEvent(event);
            if (enableWatermark && this.shouldEmitWatermark(context)) {
                this.getPravegaWriter().noteTime(context.currentWatermark());
                this.setWatermark(context.currentWatermark());
            }
            future.whenCompleteAsync((result, e) -> {
                if (e != null) {
                    log.warn("Detected a write failure: {}", e);
                    this.writeError.compareAndSet((Throwable)null, (Throwable)e);
                }
                NonTransactionalWriter nonTransactionalWriter = this;
                synchronized (nonTransactionalWriter) {
                    this.pendingWritesCount.decrementAndGet();
                    this.notify();
                }
            }, (Executor)this.executorService);
        }

        @Override
        public void close() throws Exception {
            Exception exception = null;
            try {
                this.flushAndVerify();
            }
            catch (Exception e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
            }
            try {
                super.close();
            }
            catch (Exception e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
            }
            try {
                this.executorService.shutdown();
            }
            catch (Exception e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
            }
            if (exception != null) {
                throw exception;
            }
        }

        @Override
        public List<PendingTransaction> snapshotState(long checkpointId, long checkpointTime, boolean enableWatermark) throws Exception {
            log.debug("Snapshot triggered, wait for all pending writes to complete");
            this.flushAndVerify();
            return new ArrayList<PendingTransaction>();
        }

        @Override
        public void notifyCheckpointComplete(long checkpointId) throws Exception {
        }

        @Override
        public void restoreState(List<PendingTransaction> pendingTransactionList) throws Exception {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @VisibleForTesting
        void flushAndVerify() throws Exception {
            this.getPravegaWriter().flush();
            NonTransactionalWriter nonTransactionalWriter = this;
            synchronized (nonTransactionalWriter) {
                while (this.pendingWritesCount.get() > 0) {
                    this.wait();
                }
            }
            this.checkWriteError();
        }

        private void checkWriteError() throws Exception {
            Throwable error = this.writeError.getAndSet(null);
            if (error != null) {
                throw new IOException("Write failure", error);
            }
        }
    }

    @VisibleForTesting
    class TransactionalWriter
    extends AbstractInternalWriter {
        @VisibleForTesting
        Transaction<T> currentTxn;
        @VisibleForTesting
        final ArrayDeque<TransactionAndCheckpoint<T>> txnsPendingCommit;

        TransactionalWriter(EventStreamClientFactory clientFactory) {
            super(clientFactory, true);
            this.txnsPendingCommit = new ArrayDeque();
        }

        @Override
        public void open() throws Exception {
            this.currentTxn = this.getPravegaTxnWriter().beginTxn();
            log.debug("{} - started first transaction '{}'", (Object)FlinkPravegaWriter.this.writerId(), (Object)this.currentTxn.getTxnId());
        }

        @Override
        public void write(T event, SinkFunction.Context context, boolean enableWatermark) throws Exception {
            if (FlinkPravegaWriter.this.eventRouter != null) {
                this.currentTxn.writeEvent(FlinkPravegaWriter.this.eventRouter.getRoutingKey(event), event);
            } else {
                this.currentTxn.writeEvent(event);
            }
            if (enableWatermark) {
                this.setWatermark(context.currentWatermark());
            }
        }

        @Override
        public void close() throws Exception {
            Exception exception = null;
            Transaction txn = this.currentTxn;
            if (txn != null) {
                try {
                    Exceptions.handleInterrupted(txn::abort);
                }
                catch (Exception e) {
                    exception = e;
                }
            }
            try {
                super.close();
            }
            catch (Exception e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
            }
            if (exception != null) {
                throw exception;
            }
        }

        @Override
        public List<PendingTransaction> snapshotState(long checkpointId, long checkpointTime, boolean enableWatermark) throws Exception {
            Transaction txn = this.currentTxn;
            Preconditions.checkState((txn != null ? 1 : 0) != 0, (Object)"bug: no transaction object when performing state snapshot");
            log.debug("{} - checkpoint {} triggered, flushing transaction '{}'", new Object[]{FlinkPravegaWriter.this.writerId(), checkpointId, txn.getTxnId()});
            txn.flush();
            if (enableWatermark) {
                this.txnsPendingCommit.addLast(new TransactionAndCheckpoint(txn, checkpointId, this.getWatermark()));
            } else {
                this.txnsPendingCommit.addLast(new TransactionAndCheckpoint(txn, checkpointId));
            }
            this.currentTxn = this.getPravegaTxnWriter().beginTxn();
            log.debug("{} - started new transaction '{}'", (Object)FlinkPravegaWriter.this.writerId(), (Object)this.currentTxn.getTxnId());
            log.debug("{} - storing pending transactions {}", (Object)FlinkPravegaWriter.this.writerId(), this.txnsPendingCommit);
            return this.txnsPendingCommit.stream().map(v -> new PendingTransaction(v.transaction().getTxnId(), FlinkPravegaWriter.this.stream.getScope(), FlinkPravegaWriter.this.stream.getStreamName(), v.watermark())).collect(Collectors.toList());
        }

        @Override
        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            TransactionAndCheckpoint txn;
            Preconditions.checkState((!this.txnsPendingCommit.isEmpty() ? 1 : 0) != 0, (Object)"checkpoint completed, but no transaction pending");
            while ((txn = this.txnsPendingCommit.peekFirst()) != null && txn.checkpointId() <= checkpointId) {
                this.txnsPendingCommit.removeFirst();
                String watermarkMsg = txn.watermark == null ? "" : " at watermark " + txn.watermark;
                log.info("{} - checkpoint {} complete{}, committing completed checkpoint transaction {}", new Object[]{FlinkPravegaWriter.this.writerId(), checkpointId, watermarkMsg, txn.transaction().getTxnId()});
                if (txn.watermark() != null) {
                    txn.transaction().commit(txn.watermark());
                    log.debug("{} - committed checkpoint transaction {} at watermark {}", new Object[]{FlinkPravegaWriter.this.writerId(), txn.transaction().getTxnId(), txn.watermark()});
                    continue;
                }
                txn.transaction().commit();
                log.debug("{} - committed checkpoint transaction {}", (Object)FlinkPravegaWriter.this.writerId(), (Object)txn.transaction().getTxnId());
            }
        }

        @Override
        public void restoreState(List<PendingTransaction> pendingTransactionList) throws Exception {
            if (pendingTransactionList == null || pendingTransactionList.size() == 0) {
                return;
            }
            Object exception = null;
            Map<Stream, List<PendingTransaction>> pendingTransactionsMap = pendingTransactionList.stream().collect(Collectors.groupingBy(s2 -> Stream.of(s2.getScope(), s2.getStream())));
            log.debug("pendingTransactionsMap:: " + pendingTransactionsMap);
            for (Map.Entry<Stream, List<PendingTransaction>> transactionsEntry : pendingTransactionsMap.entrySet()) {
                Stream streamId = transactionsEntry.getKey();
                String scope = streamId.getScope();
                String stream = streamId.getStreamName();
                FlinkSerializer eventSerializer = new FlinkSerializer(FlinkPravegaWriter.this.serializationSchema);
                EventWriterConfig writerConfig = EventWriterConfig.builder().transactionTimeoutTime(FlinkPravegaWriter.this.txnLeaseRenewalPeriod).build();
                try {
                    EventStreamClientFactory restoreClientFactory = FlinkPravegaWriter.this.createClientFactory(scope, FlinkPravegaWriter.this.clientConfig);
                    try {
                        TransactionalEventStreamWriter restorePravegaWriter = restoreClientFactory.createTransactionalEventWriter(FlinkPravegaWriter.this.writerId(), stream, eventSerializer, writerConfig);
                        try {
                            log.info("restore state for the scope: {} and stream: {}", (Object)scope, (Object)stream);
                            List<PendingTransaction> pendingTransactions = transactionsEntry.getValue();
                            for (PendingTransaction pendingTransaction : pendingTransactions) {
                                UUID txnId = pendingTransaction.getUuid();
                                Transaction txn = restorePravegaWriter.getTxn(txnId);
                                Transaction.Status status = txn.checkStatus();
                                if (status == Transaction.Status.OPEN) {
                                    log.debug("{} - committing completed checkpoint transaction {} at Watermark {} after task restore", new Object[]{FlinkPravegaWriter.this.writerId(), txnId, pendingTransaction.getWatermark()});
                                    if (pendingTransaction.getWatermark() != null) {
                                        txn.commit(pendingTransaction.getWatermark());
                                    } else {
                                        txn.commit();
                                    }
                                    log.debug("{} - committed checkpoint transaction {}", (Object)FlinkPravegaWriter.this.writerId(), (Object)txnId);
                                    continue;
                                }
                                if (status == Transaction.Status.COMMITTED || status == Transaction.Status.COMMITTING) {
                                    log.debug("{} - at restore, transaction {} was already committed", (Object)FlinkPravegaWriter.this.writerId(), (Object)txnId);
                                    continue;
                                }
                                log.warn("{} - found unexpected transaction status {} for transaction {} on task restore. Transaction probably timed out between failure and restore. ", new Object[]{FlinkPravegaWriter.this.writerId(), status, txnId});
                            }
                        }
                        finally {
                            if (restorePravegaWriter == null) continue;
                            restorePravegaWriter.close();
                        }
                    }
                    finally {
                        if (restoreClientFactory == null) continue;
                        restoreClientFactory.close();
                    }
                }
                catch (Exception e) {
                    log.error("Exception occurred while restoring the state for scope: {} and stream: {}", new Object[]{scope, stream, e});
                }
            }
            if (exception != null) {
                throw exception;
            }
        }
    }

    @VisibleForTesting
    abstract class AbstractInternalWriter {
        private EventStreamWriter<T> pravegaWriter;
        private TransactionalEventStreamWriter<T> pravegaTxnWriter;
        private transient long watermark;

        AbstractInternalWriter(EventStreamClientFactory clientFactory, boolean txnWriter) {
            FlinkSerializer eventSerializer = new FlinkSerializer(FlinkPravegaWriter.this.serializationSchema);
            EventWriterConfig writerConfig = EventWriterConfig.builder().transactionTimeoutTime(FlinkPravegaWriter.this.txnLeaseRenewalPeriod).build();
            this.watermark = Long.MIN_VALUE;
            if (txnWriter) {
                this.pravegaTxnWriter = clientFactory.createTransactionalEventWriter(FlinkPravegaWriter.this.writerId(), FlinkPravegaWriter.this.stream.getStreamName(), eventSerializer, writerConfig);
            } else {
                this.pravegaWriter = clientFactory.createEventWriter(FlinkPravegaWriter.this.writerId(), FlinkPravegaWriter.this.stream.getStreamName(), eventSerializer, writerConfig);
            }
        }

        boolean shouldEmitWatermark(SinkFunction.Context context) {
            return context.currentWatermark() > Long.MIN_VALUE && context.currentWatermark() < Long.MAX_VALUE && this.watermark < context.currentWatermark() && context.timestamp() >= context.currentWatermark();
        }

        abstract void open() throws Exception;

        abstract void write(T var1, SinkFunction.Context var2, boolean var3) throws Exception;

        void close() throws Exception {
            if (this.pravegaWriter != null) {
                this.pravegaWriter.close();
            }
            if (this.pravegaTxnWriter != null) {
                this.pravegaTxnWriter.close();
            }
        }

        abstract List<PendingTransaction> snapshotState(long var1, long var3, boolean var5) throws Exception;

        abstract void restoreState(List<PendingTransaction> var1) throws Exception;

        abstract void notifyCheckpointComplete(long var1) throws Exception;

        public EventStreamWriter<T> getPravegaWriter() {
            return this.pravegaWriter;
        }

        public TransactionalEventStreamWriter<T> getPravegaTxnWriter() {
            return this.pravegaTxnWriter;
        }

        public long getWatermark() {
            return this.watermark;
        }

        public void setWatermark(long watermark) {
            this.watermark = watermark;
        }
    }

    @VisibleForTesting
    static final class TransactionAndCheckpoint<T> {
        private final Transaction<T> transaction;
        private final long checkpointId;
        private final Long watermark;

        TransactionAndCheckpoint(Transaction<T> transaction, long checkpointId) {
            this.transaction = transaction;
            this.checkpointId = checkpointId;
            this.watermark = null;
        }

        TransactionAndCheckpoint(Transaction<T> transaction, long checkpointId, Long watermark) {
            this.transaction = transaction;
            this.checkpointId = checkpointId;
            this.watermark = watermark;
        }

        Transaction<T> transaction() {
            return this.transaction;
        }

        long checkpointId() {
            return this.checkpointId;
        }

        Long watermark() {
            return this.watermark;
        }

        public String toString() {
            return "(checkpoint: " + this.checkpointId + ", transaction: " + this.transaction.getTxnId() + ", watermark: " + this.watermark() + ')';
        }
    }

    @VisibleForTesting
    static final class FlinkSerializer<T>
    implements Serializer<T> {
        private final SerializationSchema<T> serializationSchema;

        FlinkSerializer(SerializationSchema<T> serializationSchema) {
            this.serializationSchema = serializationSchema;
        }

        @Override
        public ByteBuffer serialize(T value) {
            return ByteBuffer.wrap(this.serializationSchema.serialize(value));
        }

        @Override
        public T deserialize(ByteBuffer serializedValue) {
            throw new IllegalStateException("deserialize() called within a serializer");
        }
    }

    private static class StreamNameGauge
    implements Gauge<String> {
        final String stream;

        public StreamNameGauge(String stream) {
            this.stream = stream;
        }

        public String getValue() {
            return this.stream;
        }
    }
}

