/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.service;

import java.net.InetSocketAddress;
import java.util.TreeMap;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;

public class QueryAddressRegister
implements Sink<InternalRow> {
    private final ServiceManager serviceManager;

    public QueryAddressRegister(Table table) {
        this.serviceManager = ((FileStoreTable)table).store().newServiceManager();
    }

    public SinkWriter<InternalRow> createWriter(Sink.InitContext context) {
        return new QueryAddressRegisterSinkWriter(this.serviceManager);
    }

    public SinkWriter<InternalRow> createWriter(WriterInitContext context) {
        return new QueryAddressRegisterSinkWriter(this.serviceManager);
    }

    private static class QueryAddressRegisterSinkWriter
    implements SinkWriter<InternalRow> {
        private final ServiceManager serviceManager;
        private final TreeMap<Integer, InetSocketAddress> executors;
        private int numberExecutors;

        private QueryAddressRegisterSinkWriter(ServiceManager serviceManager) {
            this.serviceManager = serviceManager;
            this.executors = new TreeMap();
        }

        public void write(InternalRow row, SinkWriter.Context context) {
            int numberExecutors = row.getInt(0);
            if (this.numberExecutors != 0 && this.numberExecutors != numberExecutors) {
                throw new IllegalArgumentException(String.format("Number Executors can not be changed! Old %s , New %s .", this.numberExecutors, numberExecutors));
            }
            this.numberExecutors = numberExecutors;
            int executorId = row.getInt(1);
            String hostname = row.getString(2).toString();
            int port = row.getInt(3);
            this.executors.put(executorId, new InetSocketAddress(hostname, port));
            if (this.executors.size() == numberExecutors) {
                this.serviceManager.resetService("primary-key-lookup", this.executors.values().toArray(new InetSocketAddress[0]));
            }
        }

        public void flush(boolean endOfInput) {
        }

        public void close() {
            this.serviceManager.deleteService("primary-key-lookup");
        }
    }
}

