/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.connectors.flink;

import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Stream;
import io.pravega.connectors.flink.AbstractWriterBuilder;
import io.pravega.connectors.flink.FlinkPravegaWriter;
import io.pravega.connectors.flink.PravegaEventRouter;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkPravegaOutputFormat<T>
extends RichOutputFormat<T> {
    private static final Logger log = LoggerFactory.getLogger(FlinkPravegaOutputFormat.class);
    private static final long serialVersionUID = 1L;
    private final String stream;
    private final String scope;
    private final SerializationSchema<T> serializationSchema;
    private transient EventStreamClientFactory clientFactory;
    private final ClientConfig clientConfig;
    private final PravegaEventRouter<T> eventRouter;
    private transient EventStreamWriter<T> pravegaWriter;
    private final AtomicReference<Throwable> writeError;
    private final AtomicInteger pendingWritesCount;
    private transient ExecutorService executorService;

    public FlinkPravegaOutputFormat(ClientConfig clientConfig, Stream stream, SerializationSchema<T> serializationSchema, PravegaEventRouter<T> eventRouter) {
        this.clientConfig = (ClientConfig)Preconditions.checkNotNull((Object)clientConfig, (String)"clientConfig");
        Preconditions.checkNotNull((Object)stream, (String)"stream");
        this.stream = stream.getStreamName();
        this.scope = stream.getScope();
        this.serializationSchema = (SerializationSchema)Preconditions.checkNotNull(serializationSchema, (String)"serializationSchema");
        this.eventRouter = eventRouter;
        this.writeError = new AtomicReference<Object>(null);
        this.pendingWritesCount = new AtomicInteger(0);
    }

    public void configure(Configuration parameters) {
    }

    public void open(int taskNumber, int numTasks) throws IOException {
        FlinkPravegaWriter.FlinkSerializer<T> eventSerializer = new FlinkPravegaWriter.FlinkSerializer<T>(this.serializationSchema);
        EventWriterConfig writerConfig = EventWriterConfig.builder().build();
        this.clientFactory = this.createClientFactory(this.scope, this.clientConfig);
        this.pravegaWriter = this.clientFactory.createEventWriter(this.stream, eventSerializer, writerConfig);
        this.executorService = this.createExecutorService();
    }

    public void writeRecord(T record) throws IOException {
        this.checkWriteError();
        this.pendingWritesCount.incrementAndGet();
        CompletableFuture<Void> future = this.eventRouter != null ? this.pravegaWriter.writeEvent(this.eventRouter.getRoutingKey(record), record) : this.pravegaWriter.writeEvent(record);
        future.whenCompleteAsync((result, e) -> {
            if (e != null) {
                log.warn("Detected a write failure: {}", e);
                this.writeError.compareAndSet((Throwable)null, (Throwable)e);
            }
            FlinkPravegaOutputFormat flinkPravegaOutputFormat = this;
            synchronized (flinkPravegaOutputFormat) {
                this.pendingWritesCount.decrementAndGet();
                ((Object)((Object)this)).notify();
            }
        }, (Executor)this.executorService);
    }

    public void close() throws IOException {
        Exception exception = null;
        try {
            this.flushAndVerify();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
        }
        if (this.clientFactory != null) {
            this.clientFactory.close();
        }
        if (this.executorService != null) {
            try {
                this.executorService.shutdown();
            }
            catch (Exception e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
            }
        }
        if (exception != null) {
            throw new IOException("exception occurred while trying to close the writer", exception);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flushAndVerify() throws IOException {
        this.pravegaWriter.flush();
        FlinkPravegaOutputFormat flinkPravegaOutputFormat = this;
        synchronized (flinkPravegaOutputFormat) {
            while (this.pendingWritesCount.get() > 0) {
                try {
                    ((Object)((Object)this)).wait();
                }
                catch (InterruptedException e) {
                    throw new IOException("received interrupted exception while waiting for the writes to complete", e);
                }
            }
        }
        this.checkWriteError();
    }

    @VisibleForTesting
    protected void checkWriteError() throws IOException {
        Throwable error = this.writeError.getAndSet(null);
        if (error != null) {
            throw new IOException("Write failure", error);
        }
    }

    @VisibleForTesting
    protected EventStreamClientFactory createClientFactory(String scopeName, ClientConfig clientConfig) {
        return EventStreamClientFactory.withScope(scopeName, clientConfig);
    }

    @VisibleForTesting
    protected ExecutorService createExecutorService() {
        return Executors.newSingleThreadExecutor();
    }

    @VisibleForTesting
    protected SerializationSchema<T> getSerializationSchema() {
        return this.serializationSchema;
    }

    @VisibleForTesting
    protected String getStream() {
        return this.stream;
    }

    @VisibleForTesting
    protected String getScope() {
        return this.scope;
    }

    @VisibleForTesting
    protected boolean isErrorOccurred() {
        return this.writeError.get() != null;
    }

    @VisibleForTesting
    protected AtomicInteger getPendingWritesCount() {
        return this.pendingWritesCount;
    }

    @VisibleForTesting
    protected PravegaEventRouter<T> getEventRouter() {
        return this.eventRouter;
    }

    public static <T> Builder<T> builder() {
        return new Builder();
    }

    public static class Builder<T>
    extends AbstractWriterBuilder<Builder<T>> {
        private SerializationSchema<T> serializationSchema;
        private PravegaEventRouter<T> eventRouter;

        public Builder<T> withSerializationSchema(SerializationSchema<T> serializationSchema) {
            this.serializationSchema = serializationSchema;
            return this.builder();
        }

        public Builder<T> withEventRouter(PravegaEventRouter<T> eventRouter) {
            this.eventRouter = eventRouter;
            return this.builder();
        }

        @Override
        protected Builder<T> builder() {
            return this;
        }

        public FlinkPravegaOutputFormat<T> build() {
            Preconditions.checkNotNull(this.serializationSchema, (String)"serializationSchema");
            return new FlinkPravegaOutputFormat<T>(this.getPravegaConfig().getClientConfig(), this.resolveStream(), this.serializationSchema, this.eventRouter);
        }
    }
}

