/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.service;

import java.net.InetSocketAddress;
import java.util.TreeMap;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;

public class QueryAddressRegister
extends RichSinkFunction<InternalRow> {
    private final ServiceManager serviceManager;
    private transient int numberExecutors;
    private transient TreeMap<Integer, InetSocketAddress> executors;

    public QueryAddressRegister(Table table) {
        this.serviceManager = ((FileStoreTable)table).store().newServiceManager();
    }

    public void open(Configuration parameters) throws Exception {
        this.executors = new TreeMap();
    }

    public void invoke(InternalRow row, SinkFunction.Context context) {
        int numberExecutors = row.getInt(0);
        if (this.numberExecutors != 0 && this.numberExecutors != numberExecutors) {
            throw new IllegalArgumentException(String.format("Number Executors can not be changed! Old %s , New %s .", this.numberExecutors, numberExecutors));
        }
        this.numberExecutors = numberExecutors;
        int executorId = row.getInt(1);
        String hostname = row.getString(2).toString();
        int port = row.getInt(3);
        this.executors.put(executorId, new InetSocketAddress(hostname, port));
        if (this.executors.size() == numberExecutors) {
            this.serviceManager.resetService("primary-key-lookup", this.executors.values().toArray(new InetSocketAddress[0]));
        }
    }

    public void close() throws Exception {
        super.close();
        this.serviceManager.deleteService("primary-key-lookup");
    }
}

