/*
 * Decompiled with CFR 0.152.
 */
package com.getindata.connectors.http.internal.table.lookup;

import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;
import com.getindata.connectors.http.internal.table.lookup.HttpTableLookupFunction;
import com.getindata.connectors.http.internal.table.lookup.LookupRow;
import com.getindata.connectors.http.internal.utils.ThreadUtils;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import lombok.Generated;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncHttpTableLookupFunction
extends AsyncLookupFunction {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AsyncHttpTableLookupFunction.class);
    private static final String PULLING_THREAD_POOL_SIZE = "8";
    private static final String PUBLISHING_THREAD_POOL_SIZE = "4";
    private final HttpTableLookupFunction decorate;
    private transient ExecutorService pullingThreadPool;
    private transient ExecutorService publishingThreadPool;

    public void open(FunctionContext context) throws Exception {
        super.open(context);
        this.decorate.open(context);
        int pullingThreadPoolSize = Integer.parseInt(this.decorate.getOptions().getProperties().getProperty("gid.connector.http.source.lookup.request.thread-pool.size", PULLING_THREAD_POOL_SIZE));
        int publishingThreadPoolSize = Integer.parseInt(this.decorate.getOptions().getProperties().getProperty("gid.connector.http.source.lookup.response.thread-pool.size", PUBLISHING_THREAD_POOL_SIZE));
        this.pullingThreadPool = Executors.newFixedThreadPool(pullingThreadPoolSize, (ThreadFactory)new ExecutorThreadFactory("http-async-lookup-worker", ThreadUtils.LOGGING_EXCEPTION_HANDLER));
        this.publishingThreadPool = Executors.newFixedThreadPool(publishingThreadPoolSize, (ThreadFactory)new ExecutorThreadFactory("http-async-publishing-worker", ThreadUtils.LOGGING_EXCEPTION_HANDLER));
    }

    public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
        CompletableFuture<Collection> future = new CompletableFuture<Collection>();
        future.completeAsync(() -> this.decorate.lookup(keyRow), this.pullingThreadPool);
        CompletableFuture<Collection<RowData>> resultFuture = new CompletableFuture<Collection<RowData>>();
        future.whenCompleteAsync((result, throwable) -> {
            if (throwable != null) {
                log.error("Exception while processing Http Async request", throwable);
                resultFuture.completeExceptionally(new RuntimeException("Exception while processing Http Async request", (Throwable)throwable));
            } else {
                resultFuture.complete((Collection<RowData>)result);
            }
        }, (Executor)this.publishingThreadPool);
        return resultFuture;
    }

    public LookupRow getLookupRow() {
        return this.decorate.getLookupRow();
    }

    public HttpLookupConfig getOptions() {
        return this.decorate.getOptions();
    }

    public void close() throws Exception {
        this.publishingThreadPool.shutdownNow();
        this.pullingThreadPool.shutdownNow();
        super.close();
    }

    @Generated
    public AsyncHttpTableLookupFunction(HttpTableLookupFunction decorate) {
        this.decorate = decorate;
    }
}

