/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.tez;

import io.trino.hive.$internal.com.google.common.annotations.VisibleForTesting;
import io.trino.hive.$internal.com.google.common.base.Preconditions;
import io.trino.hive.$internal.com.google.common.util.concurrent.Futures;
import io.trino.hive.$internal.com.google.common.util.concurrent.ListenableFuture;
import io.trino.hive.$internal.com.google.common.util.concurrent.ListenableFutureTask;
import io.trino.hive.$internal.com.google.common.util.concurrent.SettableFuture;
import io.trino.hive.$internal.org.slf4j.Logger;
import io.trino.hive.$internal.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.hive.registry.impl.TezAmInstance;
import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;

class TezSessionPool<SessionType extends TezSessionPoolSession> {
    private static final Logger LOG = LoggerFactory.getLogger(TezSessionPool.class);
    private final HiveConf initConf;
    private int initialSize = 0;
    private final SessionObjectFactory<SessionType> sessionObjFactory;
    private final ReentrantLock poolLock = new ReentrantLock(true);
    private final Condition notEmpty = this.poolLock.newCondition();
    private final LinkedList<SessionType> pool = new LinkedList();
    private final LinkedList<SettableFuture<SessionType>> asyncRequests = new LinkedList();
    private final AtomicInteger deltaRemaining = new AtomicInteger();
    private final String amRegistryName;
    private final TezAmRegistryImpl amRegistry;
    private final ConcurrentHashMap<String, SessionType> bySessionId = new ConcurrentHashMap();
    private SessionState parentSessionState;

    TezSessionPool(HiveConf initConf, int numSessionsTotal, boolean useAmRegistryIfPresent, SessionObjectFactory<SessionType> sessionFactory) {
        this.initConf = initConf;
        this.initialSize = numSessionsTotal;
        this.amRegistry = useAmRegistryIfPresent ? TezAmRegistryImpl.create((Configuration)initConf, (boolean)true) : null;
        this.amRegistryName = this.amRegistry == null ? null : this.amRegistry.getRegistryName();
        this.sessionObjFactory = sessionFactory;
    }

    void start() throws Exception {
        if (this.amRegistry != null) {
            this.amRegistry.start();
            this.amRegistry.initializeWithoutRegistering();
            this.amRegistry.registerStateChangeListener((ServiceInstanceStateChangeListener)new ChangeListener());
            this.amRegistry.populateCache(true);
        }
        this.parentSessionState = SessionState.get();
        if (this.initialSize == 0) {
            return;
        }
        int threadCount = Math.min(this.initialSize, HiveConf.getIntVar(this.initConf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS));
        Preconditions.checkArgument(threadCount > 0);
        if (threadCount == 1) {
            TezSessionPoolSession session;
            for (int i = 0; i < this.initialSize && (session = (TezSessionPoolSession)this.sessionObjFactory.create(null)) != null; ++i) {
                this.startInitialSession(session);
            }
        } else {
            int i;
            AtomicInteger remaining = new AtomicInteger(this.initialSize);
            FutureTask[] threadTasks = new FutureTask[threadCount];
            for (i = threadTasks.length - 1; i >= 0; --i) {
                threadTasks[i] = new FutureTask<Boolean>(new CreateSessionsRunnable(remaining));
                if (i == 0) {
                    threadTasks[i].run();
                    continue;
                }
                new Thread((Runnable)threadTasks[i], "Tez session init " + i).start();
            }
            for (i = 0; i < threadTasks.length; ++i) {
                threadTasks[i].get();
            }
        }
    }

    SessionType getSession() throws Exception {
        while (true) {
            TezSessionPoolSession result = null;
            this.poolLock.lock();
            try {
                while ((result = (TezSessionPoolSession)this.pool.poll()) == null) {
                    this.notEmpty.await(100L, TimeUnit.MILLISECONDS);
                }
            }
            finally {
                this.poolLock.unlock();
            }
            if (result.tryUse(false)) {
                return (SessionType)result;
            }
            LOG.info("Couldn't use a session [" + result + "]; attempting another one");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    ListenableFuture<SessionType> getSessionAsync() throws Exception {
        SettableFuture<TezSessionPoolSession> future = SettableFuture.create();
        this.poolLock.lock();
        try {
            TezSessionPoolSession result;
            while ((result = (TezSessionPoolSession)this.pool.poll()) != null) {
                if (!result.tryUse(false)) continue;
                future.set(result);
                SettableFuture<TezSessionPoolSession> settableFuture = future;
                return settableFuture;
            }
            this.asyncRequests.add(future);
            SettableFuture<TezSessionPoolSession> settableFuture = future;
            return settableFuture;
        }
        finally {
            this.poolLock.unlock();
        }
    }

    void returnSession(SessionType session) {
        this.returnSessionInternal(session, false);
    }

    boolean returnSessionAsync(SessionType session) {
        return this.returnSessionInternal(session, true);
    }

    private boolean returnSessionInternal(SessionType session, boolean isAsync) {
        SessionState sessionState = SessionState.get();
        if (sessionState != null) {
            sessionState.setTezSession(null);
        }
        if (!((TezSessionPoolSession)session).stopUsing()) {
            return true;
        }
        boolean canPutBack = this.putSessionBack(session, true);
        if (canPutBack) {
            return true;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Closing an unneeded returned session " + session);
        }
        if (isAsync) {
            return false;
        }
        try {
            ((TezSessionPoolSession)session).close(false);
        }
        catch (Exception ex) {
            LOG.error("Failed to close " + session, ex);
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean putSessionBack(SessionType session, boolean isFirst) {
        SettableFuture<SessionType> future = null;
        this.poolLock.lock();
        try {
            int remainingToKill;
            while ((remainingToKill = -this.deltaRemaining.get()) > 0) {
                if (!this.deltaRemaining.compareAndSet(-remainingToKill, -remainingToKill + 1)) continue;
                boolean bl = false;
                return bl;
            }
            if (!this.asyncRequests.isEmpty()) {
                if (!((TezSessionPoolSession)session).tryUse(false)) {
                    boolean bl = true;
                    return bl;
                }
                future = this.asyncRequests.poll();
            }
            if (future == null) {
                if (isFirst) {
                    this.pool.addFirst(session);
                } else {
                    this.pool.addLast(session);
                }
                this.notEmpty.signalAll();
            }
        }
        finally {
            this.poolLock.unlock();
        }
        if (future != null) {
            future.set(session);
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void replaceSession(SessionType oldSession) throws Exception {
        TezSessionPoolSession newSession = (TezSessionPoolSession)this.sessionObjFactory.create(oldSession);
        String queueName = ((TezSessionState)oldSession).getQueueName();
        try {
            ((TezSessionPoolSession)oldSession).close(false);
        }
        finally {
            this.poolLock.lock();
            try {
                this.pool.remove(oldSession);
            }
            finally {
                this.poolLock.unlock();
            }
            this.notifyClosed((TezSessionState)oldSession);
            newSession.getConf().set("tez.queue.name", queueName);
            this.configureAmRegistry(newSession);
            if (SessionState.get() == null && this.parentSessionState != null) {
                SessionState.setCurrentSessionState(this.parentSessionState);
            }
            newSession.open();
            if (!this.putSessionBack(newSession, false)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Closing an unneeded session " + newSession + "; trying to replace " + oldSession);
                }
                try {
                    newSession.close(false);
                }
                catch (Exception ex) {
                    LOG.error("Failed to close an unneeded session", ex);
                }
            }
        }
    }

    private void startInitialSession(SessionType session) throws Exception {
        boolean isUsable = ((TezSessionPoolSession)session).tryUse(true);
        if (!isUsable) {
            throw new IOException(session + " is not usable at pool startup");
        }
        ((TezSessionState)session).getConf().set("tez.queue.name", ((TezSessionState)session).getQueueName());
        this.configureAmRegistry(session);
        ((TezSessionState)session).open();
        if (((TezSessionPoolSession)session).stopUsing() && !this.putSessionBack(session, false)) {
            LOG.warn("Couldn't add a session during initialization");
            try {
                ((TezSessionPoolSession)session).close(false);
            }
            catch (Exception ex) {
                LOG.error("Failed to close an unneeded session", ex);
            }
        }
    }

    private void configureAmRegistry(SessionType session) {
        if (this.amRegistryName != null) {
            this.bySessionId.put(((TezSessionState)session).getSessionId(), session);
            HiveConf conf = ((TezSessionState)session).getConf();
            conf.set(HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME.varname, this.amRegistryName);
            conf.set(HiveConf.ConfVars.HIVESESSIONID.varname, ((TezSessionState)session).getSessionId());
        }
    }

    @VisibleForTesting
    int getInitialSize() {
        return this.initialSize;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ListenableFuture<?> resizeAsync(int delta, List<SessionType> toClose) {
        if (delta == 0) {
            return this.createDummyFuture();
        }
        this.poolLock.lock();
        try {
            if (delta < 0) {
                ListenableFuture<Boolean> listenableFuture = this.resizeDownInternal(-delta, toClose);
                return listenableFuture;
            }
            ListenableFuture<?> listenableFuture = this.resizeUpInternal(delta);
            return listenableFuture;
        }
        finally {
            this.poolLock.unlock();
        }
    }

    private ListenableFuture<?> resizeUpInternal(int delta) {
        int oldVal;
        while (!this.deltaRemaining.compareAndSet(oldVal = this.deltaRemaining.get(), oldVal + delta)) {
        }
        int toStart = oldVal + delta;
        if (toStart <= 0) {
            return this.createDummyFuture();
        }
        LOG.info("Resizing the pool; adding " + toStart + " sessions");
        int threadCount = Math.max(1, Math.min(toStart, HiveConf.getIntVar(this.initConf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS)));
        ArrayList<ListenableFutureTask<Boolean>> threadTasks = new ArrayList<ListenableFutureTask<Boolean>>(threadCount);
        for (int i = 0; i < threadCount; ++i) {
            ListenableFutureTask<Boolean> task = ListenableFutureTask.create(new CreateSessionsRunnable(this.deltaRemaining));
            new Thread(task, "Tez pool resize " + i).start();
            threadTasks.add(task);
        }
        return Futures.allAsList(threadTasks);
    }

    private ListenableFuture<Boolean> resizeDownInternal(int delta, List<SessionType> toClose) {
        TezSessionPoolSession session;
        int expansionCount;
        while ((expansionCount = this.deltaRemaining.get()) > 0) {
            int expansionCancelled = Math.min(expansionCount, delta);
            if (!this.deltaRemaining.compareAndSet(expansionCount, expansionCount - expansionCancelled)) continue;
            delta -= expansionCancelled;
            break;
        }
        while (delta > 0 && (session = (TezSessionPoolSession)this.pool.poll()) != null) {
            if (!session.tryUse(true)) continue;
            toClose.add(session);
            --delta;
        }
        if (delta > 0) {
            int oldVal;
            while (!this.deltaRemaining.compareAndSet(oldVal = this.deltaRemaining.get(), oldVal - delta)) {
            }
        }
        return this.createDummyFuture();
    }

    private ListenableFuture<Boolean> createDummyFuture() {
        SettableFuture<Boolean> f = SettableFuture.create();
        f.set(true);
        return f;
    }

    @VisibleForTesting
    int getCurrentSize() {
        this.poolLock.lock();
        try {
            int n = this.pool.size();
            return n;
        }
        finally {
            this.poolLock.unlock();
        }
    }

    public void notifyClosed(TezSessionState session) {
        this.bySessionId.remove(session.getSessionId());
    }

    private final class CreateSessionsRunnable
    implements Callable<Boolean> {
        private final AtomicInteger remaining;

        private CreateSessionsRunnable(AtomicInteger remaining) {
            this.remaining = remaining;
        }

        @Override
        public Boolean call() throws Exception {
            if (TezSessionPool.this.parentSessionState != null) {
                SessionState.setCurrentSessionState(TezSessionPool.this.parentSessionState);
            }
            int oldVal;
            while ((oldVal = this.remaining.get()) > 0) {
                if (!this.remaining.compareAndSet(oldVal, oldVal - 1)) continue;
                TezSessionPool.this.startInitialSession(TezSessionPool.this.sessionObjFactory.create(null));
            }
            return true;
        }
    }

    private final class ChangeListener
    implements ServiceInstanceStateChangeListener<TezAmInstance> {
        private ChangeListener() {
        }

        public void onCreate(TezAmInstance si, int ephSeqVersion) {
            String sessionId = si.getSessionId();
            TezSessionPoolSession session = (TezSessionPoolSession)TezSessionPool.this.bySessionId.get(sessionId);
            if (session != null) {
                LOG.info("AM for " + sessionId + ", v." + ephSeqVersion + " has registered; updating [" + session + "] with an endpoint at " + si.getPluginPort());
                session.updateFromRegistry(si, ephSeqVersion);
            } else {
                LOG.warn("AM for an unknown " + sessionId + " has registered; ignoring");
            }
        }

        public void onUpdate(TezAmInstance si, int ephSeqVersion) {
            String sessionId = si.getSessionId();
            TezSessionPoolSession session = (TezSessionPoolSession)TezSessionPool.this.bySessionId.get(sessionId);
            if (session != null) {
                LOG.info("AM for " + sessionId + ", v." + ephSeqVersion + " has updated; updating [" + session + "] with an endpoint at " + si.getPluginPort());
                session.updateFromRegistry(si, ephSeqVersion);
            } else {
                LOG.warn("AM for an unknown " + sessionId + " has updated; ignoring");
            }
        }

        public void onRemove(TezAmInstance serviceInstance, int ephSeqVersion) {
            String sessionId = serviceInstance.getSessionId();
            TezSessionPoolSession session = (TezSessionPoolSession)TezSessionPool.this.bySessionId.get(sessionId);
            if (session != null) {
                LOG.info("AM for " + sessionId + ", v." + ephSeqVersion + " has unregistered; updating [" + session + "]");
                session.updateFromRegistry(null, ephSeqVersion);
            } else {
                LOG.warn("AM for an unknown " + sessionId + " has unregistered; ignoring");
            }
        }
    }

    public static interface SessionObjectFactory<SessionType> {
        public SessionType create(SessionType var1);
    }
}

