/*
 * Decompiled with CFR 0.152.
 */
package com.getindata.connectors.http.internal.table.lookup;

import com.getindata.connectors.http.internal.PollingClient;
import com.getindata.connectors.http.internal.PollingClientFactory;
import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;
import com.getindata.connectors.http.internal.table.lookup.LookupRow;
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpTableLookupFunction
extends LookupFunction {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(HttpTableLookupFunction.class);
    private final PollingClientFactory<RowData> pollingClientFactory;
    private final DeserializationSchema<RowData> responseSchemaDecoder;
    @VisibleForTesting
    private final LookupRow lookupRow;
    @VisibleForTesting
    private final HttpLookupConfig options;
    private transient AtomicInteger localHttpCallCounter;
    private transient PollingClient<RowData> client;

    public HttpTableLookupFunction(PollingClientFactory<RowData> pollingClientFactory, DeserializationSchema<RowData> responseSchemaDecoder, LookupRow lookupRow, HttpLookupConfig options) {
        this.pollingClientFactory = pollingClientFactory;
        this.responseSchemaDecoder = responseSchemaDecoder;
        this.lookupRow = lookupRow;
        this.options = options;
    }

    public void open(FunctionContext context) throws Exception {
        this.responseSchemaDecoder.open(SerializationSchemaUtils.createDeserializationInitContext(HttpTableLookupFunction.class));
        this.localHttpCallCounter = new AtomicInteger(0);
        this.client = this.pollingClientFactory.createPollClient(this.options, this.responseSchemaDecoder);
        context.getMetricGroup().gauge("http-table-lookup-call-counter", () -> this.localHttpCallCounter.intValue());
        this.client.open(context);
    }

    public Collection<RowData> lookup(RowData keyRow) {
        this.localHttpCallCounter.incrementAndGet();
        return this.client.pull(keyRow);
    }

    @Generated
    LookupRow getLookupRow() {
        return this.lookupRow;
    }

    @Generated
    HttpLookupConfig getOptions() {
        return this.options;
    }
}

