/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.functions;

import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReferenceCountingFlinkExecutableStageContextFactory
implements FlinkExecutableStageContext.Factory {
    private static final Logger LOG = LoggerFactory.getLogger(ReferenceCountingFlinkExecutableStageContextFactory.class);
    private static final int MAX_RETRY = 3;
    private final Creator creator;
    private volatile transient ScheduledExecutorService executor;
    private volatile transient ConcurrentHashMap<String, WrappedContext> keyRegistry;

    public static ReferenceCountingFlinkExecutableStageContextFactory create(Creator creator) {
        return new ReferenceCountingFlinkExecutableStageContextFactory(creator);
    }

    private ReferenceCountingFlinkExecutableStageContextFactory(Creator creator) {
        this.creator = creator;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public FlinkExecutableStageContext get(JobInfo jobInfo) {
        for (int retry = 0; retry < 3; ++retry) {
            WrappedContext wrapper;
            WrappedContext wrappedContext = wrapper = this.getCache().computeIfAbsent(jobInfo.jobId(), jobId -> {
                try {
                    return new WrappedContext(jobInfo, (FlinkExecutableStageContext)this.creator.apply(jobInfo));
                }
                catch (Exception e) {
                    throw new RuntimeException("Unable to create context for job " + jobInfo.jobId(), e);
                }
            });
            synchronized (wrappedContext) {
                if (wrapper.referenceCount != null) {
                    wrapper.referenceCount.incrementAndGet();
                    return wrapper;
                }
                continue;
            }
        }
        throw new RuntimeException(String.format("Max retry %s exhausted while creating Context for job %s", 3, jobInfo.jobId()));
    }

    private void scheduleRelease(JobInfo jobInfo) {
        WrappedContext wrapper = this.getCache().get(jobInfo.jobId());
        Preconditions.checkState((wrapper != null ? 1 : 0) != 0, (Object)("Releasing context for unknown job: " + jobInfo.jobId()));
        PipelineOptions pipelineOptions = PipelineOptionsTranslation.fromProto((Struct)jobInfo.pipelineOptions());
        int environmentCacheTTLMillis = ((PortablePipelineOptions)pipelineOptions.as(PortablePipelineOptions.class)).getEnvironmentCacheMillis();
        if (environmentCacheTTLMillis > 0) {
            if (this.getClass().getClassLoader() != ExecutionEnvironment.class.getClassLoader()) {
                LOG.warn("{} is not loaded on parent Flink classloader. Falling back to synchronous environment release for job {}.", this.getClass(), (Object)jobInfo.jobId());
                this.release(wrapper);
            } else {
                this.getExecutor().schedule(() -> this.release(wrapper), (long)environmentCacheTTLMillis, TimeUnit.MILLISECONDS);
            }
        } else {
            this.release(wrapper);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ConcurrentHashMap<String, WrappedContext> getCache() {
        if (this.keyRegistry != null) {
            return this.keyRegistry;
        }
        ReferenceCountingFlinkExecutableStageContextFactory referenceCountingFlinkExecutableStageContextFactory = this;
        synchronized (referenceCountingFlinkExecutableStageContextFactory) {
            if (this.keyRegistry == null) {
                this.keyRegistry = new ConcurrentHashMap();
            }
            return this.keyRegistry;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ScheduledExecutorService getExecutor() {
        if (this.executor != null) {
            return this.executor;
        }
        ReferenceCountingFlinkExecutableStageContextFactory referenceCountingFlinkExecutableStageContextFactory = this;
        synchronized (referenceCountingFlinkExecutableStageContextFactory) {
            if (this.executor == null) {
                this.executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build());
            }
            return this.executor;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void release(FlinkExecutableStageContext context) {
        WrappedContext wrapper;
        WrappedContext wrappedContext = wrapper = (WrappedContext)context;
        synchronized (wrappedContext) {
            if (wrapper.referenceCount.decrementAndGet() == 0) {
                wrapper.referenceCount = null;
                if (this.getCache().remove(wrapper.jobInfo.jobId(), wrapper)) {
                    try {
                        wrapper.closeActual();
                    }
                    catch (Throwable t) {
                        LOG.error("Unable to close FlinkExecutableStageContext.", t);
                    }
                }
            }
        }
    }

    public static interface Creator
    extends ThrowingFunction<JobInfo, FlinkExecutableStageContext>,
    Serializable {
    }

    @VisibleForTesting
    class WrappedContext
    implements FlinkExecutableStageContext {
        private JobInfo jobInfo;
        private AtomicInteger referenceCount;
        @VisibleForTesting
        FlinkExecutableStageContext context;

        WrappedContext(JobInfo jobInfo, FlinkExecutableStageContext context) {
            this.jobInfo = jobInfo;
            this.context = context;
            this.referenceCount = new AtomicInteger(0);
        }

        @Override
        public StageBundleFactory getStageBundleFactory(ExecutableStage executableStage) {
            return this.context.getStageBundleFactory(executableStage);
        }

        @Override
        public void close() {
            ReferenceCountingFlinkExecutableStageContextFactory.this.scheduleRelease(this.jobInfo);
        }

        private void closeActual() throws Exception {
            this.context.close();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            WrappedContext that = (WrappedContext)o;
            return Objects.equals(this.jobInfo.jobId(), that.jobInfo.jobId());
        }

        public int hashCode() {
            return Objects.hash(this.jobInfo);
        }

        public String toString() {
            return "ContextWrapper{jobId='" + this.jobInfo + '\'' + ", referenceCount=" + this.referenceCount + '}';
        }
    }
}

