/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.common.dim;

import com.alibaba.ververica.connectors.common.dim.cache.AllCache;
import com.alibaba.ververica.connectors.common.dim.cache.Cache;
import com.alibaba.ververica.connectors.common.dim.cache.CacheFactory;
import com.alibaba.ververica.connectors.common.dim.cache.CacheStrategy;
import com.alibaba.ververica.connectors.common.dim.reload.CacheAllReloadConf;
import com.alibaba.ververica.connectors.common.dim.reload.SerializableRunnable;
import com.alibaba.ververica.connectors.planner.CodeGenerator;
import com.alibaba.ververica.connectors.planner.PlannerDelegateLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.CacheMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup;
import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.source.KeyGroupPruner;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DimJoinFetcher<V>
extends AbstractRichFunction {
    private static final Logger LOG = LoggerFactory.getLogger(DimJoinFetcher.class);
    public static final String LOOKUP_CACHE_METRIC_GROUP_NAME = "cache";
    protected final String sqlTableName;
    protected final RowType dimRowType;
    protected final String[] lookupKeys;
    protected final CacheStrategy cacheStrategy;
    protected SerializableRunnable cacheReloadRunner;
    protected volatile CacheAllReloadConf reloadConf;
    protected KeyGroupPruner<RowData> cachePartitioner;
    protected transient CacheFactory<Object, V> cacheFactory;
    protected transient Cache<Object, V> cache;
    protected transient ScheduledExecutorService reloadExecutor;
    protected volatile transient AllCache<Object, V> allCacheHandler;
    private transient String cacheId;
    private List<Integer> sourceKeys;
    private List<Integer> targetKeys;
    private LogicalType[] keyTypes;
    private transient Projection<RowData, BinaryRowData> srcKeyProjection;
    private transient Projection<RowData, BinaryRowData> cacheKeyProjection;
    private RowDataSerializer cacheKeySer;
    protected RowDataSerializer cacheRowSer;
    private transient CacheMetricGroup cacheMetricGroup;

    protected DimJoinFetcher(String sqlTableName, RowType rowType, String[] lookupKeys, CacheStrategy cacheStrategy) {
        this(sqlTableName, rowType, lookupKeys, cacheStrategy, null);
    }

    protected DimJoinFetcher(String sqlTableName, RowType dimRowType, String[] lookupKeys, CacheStrategy cacheStrategy, KeyGroupPruner<RowData> cachePartitioner) {
        Preconditions.checkArgument((null != sqlTableName ? 1 : 0) != 0, (Object)"sqlTableName cannot be null!");
        Preconditions.checkArgument((null != lookupKeys ? 1 : 0) != 0, (Object)"lookupKeys cannot be null!");
        Preconditions.checkArgument((null != dimRowType ? 1 : 0) != 0, (Object)"dimRowType cannot be null!");
        Preconditions.checkArgument((null != cacheStrategy ? 1 : 0) != 0, (Object)"cacheStrategy cannot be null!");
        this.dimRowType = dimRowType;
        this.lookupKeys = lookupKeys;
        this.sqlTableName = sqlTableName;
        this.cacheStrategy = cacheStrategy;
        this.cachePartitioner = cachePartitioner;
        this.sourceKeys = new ArrayList<Integer>();
        this.targetKeys = new ArrayList<Integer>();
        this.cacheRowSer = new RowDataSerializer(dimRowType);
        this.keyTypes = new LogicalType[lookupKeys.length];
        String[] fieldNames = dimRowType.getFieldNames().toArray(new String[0]);
        for (int i = 0; i < lookupKeys.length; ++i) {
            this.sourceKeys.add(i);
            int targetIdx = this.getColumnIndex(lookupKeys[i], fieldNames);
            if (targetIdx < 0) {
                throw new TableException("Column: " + lookupKeys[i] + " doesn't exists.");
            }
            this.targetKeys.add(targetIdx);
            this.keyTypes[i] = dimRowType.getTypeAt(targetIdx);
        }
        this.cacheKeySer = new RowDataSerializer(this.keyTypes);
    }

    public void setAllCacheReloadRunner(SerializableRunnable cacheReloadRunner, CacheAllReloadConf reloadConf) {
        if (this.cacheStrategy.isAllCache()) {
            Objects.requireNonNull(cacheReloadRunner);
            Objects.requireNonNull(reloadConf);
            Objects.requireNonNull(reloadConf.timeRangeBlackList);
            this.cacheReloadRunner = cacheReloadRunner;
            this.reloadConf = reloadConf;
        }
    }

    public abstract void openConnection(Configuration var1);

    public abstract void closeConnection();

    public abstract boolean hasPrimaryKey();

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.openConnection(parameters);
        if (this.cacheStrategy.isBinaryCacheEnabled()) {
            this.srcKeyProjection = this.genProjection(this.keyTypes, this.sourceKeys.stream().mapToInt(i -> i).toArray());
            LogicalType[] dimTypes = (LogicalType[])this.dimRowType.getFields().stream().map(RowType.RowField::getType).toArray(LogicalType[]::new);
            this.cacheKeyProjection = this.genProjection(dimTypes, this.targetKeys.stream().mapToInt(i -> i).toArray());
        }
        this.cacheId = this.sqlTableName + ": " + Arrays.toString(this.lookupKeys) + this.dimRowType.getFieldNames().toString();
        if (this.cachePartitioner != null) {
            FunctionContext functionContext = new FunctionContext(this.getRuntimeContext());
            this.cachePartitioner.open(functionContext);
            this.cacheId = this.cacheId + "-" + functionContext.getIndexOfThisSubtask();
        }
        String cacheName = this.hasPrimaryKey() ? "one2oneCahce" : "one2manyCache";
        LOG.info("table " + this.sqlTableName + " preparing " + cacheName);
        this.cacheFactory = CacheFactory.getInstance();
        this.cache = this.cacheFactory.getCache(this.cacheId, this.cacheStrategy, this.hasPrimaryKey(), this.cacheKeySer, this.cacheRowSer);
        LOG.info("table " + this.sqlTableName + ", strategy:" + this.cacheStrategy);
        this.cacheMetricGroup = new InternalCacheMetricGroup((MetricGroup)this.getRuntimeContext().getMetricGroup(), LOOKUP_CACHE_METRIC_GROUP_NAME);
        if (this.cacheStrategy.isAllCache()) {
            this.allCacheHandler = (AllCache)this.cache;
            this.allCacheHandler.setCachePartitioner(this.cachePartitioner);
            int idx = this.allCacheHandler.counter.incrementAndGet();
            LOG.info("the {}th started lookup join worker on {}.", (Object)idx, (Object)this.sqlTableName);
            if (this.allCacheHandler.isRegisteredTimer.compareAndSet(false, true)) {
                LOG.info("Subtask-{} hold the reloading schedule future for table: {}", (Object)idx, (Object)this.sqlTableName);
                ScheduledFuture<?> future = this.scheduleCacheLoaderRunner(cacheName + "-reload-" + idx);
                this.allCacheHandler.setScheduledFuture(future);
            }
            while (!this.allCacheHandler.isLoadedOrThrowException()) {
                Thread.sleep(10L);
            }
        }
        this.cache.open(this.cacheMetricGroup);
    }

    private ScheduledFuture<?> scheduleCacheLoaderRunner(String threadName) {
        this.reloadExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat(threadName).setDaemon(true).build());
        return this.reloadExecutor.scheduleWithFixedDelay(new Thread(this.cacheReloadRunner), 0L, this.reloadConf.ttlMs, TimeUnit.MILLISECONDS);
    }

    public void close() throws Exception {
        if (this.cacheStrategy.isAllCache() && this.allCacheHandler.counter.decrementAndGet() == 0) {
            LOG.info("start to cancel reloading thread...");
            ScheduledFuture<?> future = this.allCacheHandler.getScheduledFuture();
            if (future != null) {
                LOG.info("start to cancel reloading thread for table: {}.", (Object)this.sqlTableName);
                future.cancel(true);
            }
            if (null != this.reloadExecutor && !this.reloadExecutor.isShutdown()) {
                this.reloadExecutor.shutdownNow();
                this.reloadExecutor = null;
            }
            this.allCacheHandler.isRegisteredTimer.compareAndSet(true, false);
            this.removeCache();
        }
        LOG.info("start to close connection...");
        this.closeConnection();
        if (!this.cacheStrategy.isAllCache()) {
            this.removeCache();
        }
        super.close();
    }

    private void removeCache() {
        LOG.info("start to release cache of table: {}...", (Object)this.sqlTableName);
        if (this.cacheFactory != null) {
            LOG.info("table " + this.sqlTableName + " cache removing...");
            this.cacheFactory.removeCache(this.cacheId);
            LOG.info("table " + this.sqlTableName + " cache removed");
        }
    }

    protected Object getSourceKey(RowData source) {
        return this.getKey(source, this.sourceKeys, this.keyTypes, this.srcKeyProjection, false);
    }

    protected Object prepareCacheKey(RowData target) {
        return this.getKey(target, this.targetKeys, this.keyTypes, this.cacheKeyProjection, true);
    }

    public Object getKey(RowData input, List<Integer> keys, LogicalType[] types, Projection<RowData, BinaryRowData> projection, boolean needCopy) {
        if (keys.size() == 1) {
            return this.safeGet(input, keys.get(0), types[0]);
        }
        if (projection != null) {
            if (needCopy) {
                return ((BinaryRowData)projection.apply(input)).copy();
            }
            return projection.apply(input);
        }
        GenericRowData key = new GenericRowData(keys.size());
        for (int i = 0; i < keys.size(); ++i) {
            Object field = this.safeGet(input, keys.get(i), types[i]);
            if (field == null) {
                return null;
            }
            key.setField(i, field);
        }
        return key;
    }

    public Object getKey(RowData input, List<Integer> keys, LogicalType[] types) {
        return this.getKey(input, keys, types, null, false);
    }

    @VisibleForTesting
    public AllCache<Object, V> getAllCacheHandler() {
        return this.allCacheHandler;
    }

    protected Projection<RowData, BinaryRowData> genProjection(LogicalType[] inputTypes, int[] mapping) {
        GeneratedProjection genProjection = PlannerDelegateLoader.discover(CodeGenerator.class).projectionCodeGeneratorToBinaryRow(new TableConfig(), Thread.currentThread().getContextClassLoader(), inputTypes, mapping);
        return (Projection)genProjection.newInstance(Thread.currentThread().getContextClassLoader());
    }

    protected Object safeGet(RowData inRow, int ordinal, LogicalType type) {
        if (inRow != null && !inRow.isNullAt(ordinal)) {
            RowData.FieldGetter fieldGetter = RowData.createFieldGetter((LogicalType)type, (int)ordinal);
            return fieldGetter.getFieldOrNull(inRow);
        }
        return null;
    }

    protected int getColumnIndex(String column, String[] columnNames) {
        for (int i = 0; i < columnNames.length; ++i) {
            if (!column.equals(columnNames[i])) continue;
            return i;
        }
        return -1;
    }
}

