/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.org.apache.hadoop.hbase.procedure2.store.region;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.org.apache.hadoop.hbase.Cell;
import org.apache.hudi.org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hudi.org.apache.hadoop.hbase.HConstants;
import org.apache.hudi.org.apache.hadoop.hbase.Server;
import org.apache.hudi.org.apache.hadoop.hbase.client.Delete;
import org.apache.hudi.org.apache.hadoop.hbase.client.Mutation;
import org.apache.hudi.org.apache.hadoop.hbase.client.Put;
import org.apache.hudi.org.apache.hadoop.hbase.client.Scan;
import org.apache.hudi.org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hudi.org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hudi.org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hudi.org.apache.hadoop.hbase.master.assignment.AssignProcedure;
import org.apache.hudi.org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
import org.apache.hudi.org.apache.hadoop.hbase.master.assignment.UnassignProcedure;
import org.apache.hudi.org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
import org.apache.hudi.org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hudi.org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hudi.org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hudi.org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hudi.org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hudi.org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hudi.org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hudi.org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
import org.apache.hudi.org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
import org.apache.hudi.org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hudi.org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hudi.org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hudi.org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hudi.org.apache.hadoop.hbase.util.Bytes;
import org.apache.hudi.org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hudi.org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class RegionProcedureStore
extends ProcedureStoreBase {
    private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStore.class);
    static final byte[] PROC_QUALIFIER = Bytes.toBytes("d");
    private final Server server;
    private final LeaseRecovery leaseRecovery;
    final MasterRegion region;
    private int numThreads;
    private static final ImmutableSet<Class<?>> UNSUPPORTED_PROCEDURES = ImmutableSet.of(RecoverMetaProcedure.class, AssignProcedure.class, UnassignProcedure.class, MoveRegionProcedure.class);

    public RegionProcedureStore(Server server, MasterRegion region, LeaseRecovery leaseRecovery) {
        this.server = server;
        this.region = region;
        this.leaseRecovery = leaseRecovery;
    }

    public void start(int numThreads) throws IOException {
        if (!this.setRunning(true)) {
            return;
        }
        LOG.info("Starting the Region Procedure Store, number threads={}", (Object)numThreads);
        this.numThreads = numThreads;
    }

    public void stop(boolean abort) {
        if (!this.setRunning(false)) {
            return;
        }
        LOG.info("Stopping the Region Procedure Store, isAbort={}", (Object)abort);
    }

    public int getNumThreads() {
        return this.numThreads;
    }

    public int setRunningProcedureCount(int count) {
        return count;
    }

    private void checkUnsupportedProcedure(Map<Class<?>, List<Procedure<?>>> procsByType) throws HBaseIOException {
        for (Class clazz : UNSUPPORTED_PROCEDURES) {
            List<Procedure<?>> procs = procsByType.get(clazz);
            if (procs == null) continue;
            LOG.error("Unsupported procedure type {} found, please rollback your master to the old version to finish them, and then try to upgrade again. See https://hbase.apache.org/book.html#upgrade2.2 for more details. The full procedure list: {}", (Object)clazz, (Object)procs);
            throw new HBaseIOException("Unsupported procedure type " + clazz + " found");
        }
        if (procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream().map(p -> (ServerCrashProcedure)p).anyMatch(ServerCrashProcedure::isInRecoverMetaState)) {
            LOG.error("At least one ServerCrashProcedure is going to schedule a RecoverMetaProcedure, which is not supported any more. Please rollback your master to the old version to finish them, and then try to upgrade again. See https://hbase.apache.org/book.html#upgrade2.2 for more details.");
            throw new HBaseIOException("Unsupported procedure state found for ServerCrashProcedure");
        }
    }

    private void tryMigrate(FileSystem fs) throws IOException {
        Configuration conf = this.server.getConfiguration();
        Path procWALDir = new Path(CommonFSUtils.getWALRootDir(conf), "MasterProcWALs");
        if (!fs.exists(procWALDir)) {
            return;
        }
        LOG.info("The old WALProcedureStore wal directory {} exists, migrating...", (Object)procWALDir);
        WALProcedureStore store = new WALProcedureStore(conf, this.leaseRecovery);
        store.start(this.numThreads);
        store.recoverLease();
        final MutableLong maxProcIdSet = new MutableLong(-1L);
        final ArrayList procs = new ArrayList();
        final HashMap activeProcsByType = new HashMap();
        store.load(new ProcedureStore.ProcedureLoader(){

            public void setMaxProcId(long maxProcId) {
                maxProcIdSet.setValue(maxProcId);
            }

            public void load(ProcedureStore.ProcedureIterator procIter) throws IOException {
                while (procIter.hasNext()) {
                    Procedure proc = procIter.next();
                    procs.add(proc);
                    if (proc.isFinished()) continue;
                    activeProcsByType.computeIfAbsent(proc.getClass(), k -> new ArrayList()).add(proc);
                }
            }

            public void handleCorrupted(ProcedureStore.ProcedureIterator procIter) throws IOException {
                long corruptedCount = 0L;
                while (procIter.hasNext()) {
                    LOG.error("Corrupted procedure {}", (Object)procIter.next());
                    ++corruptedCount;
                }
                if (corruptedCount > 0L) {
                    throw new IOException("There are " + corruptedCount + " corrupted procedures when migrating from the old WAL based store to the new region based store, please fix them before upgrading again.");
                }
            }
        });
        this.checkUnsupportedProcedure(activeProcsByType);
        MutableLong maxProcIdFromProcs = new MutableLong(-1L);
        for (Procedure proc : procs) {
            this.update(proc);
            if (proc.getProcId() <= maxProcIdFromProcs.longValue()) continue;
            maxProcIdFromProcs.setValue(proc.getProcId());
        }
        LOG.info("Migrated {} existing procedures from the old storage format.", (Object)procs.size());
        LOG.info("The WALProcedureStore max pid is {}, and the max pid of all loaded procedures is {}", (Object)maxProcIdSet.longValue(), (Object)maxProcIdFromProcs.longValue());
        if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) {
            if (maxProcIdSet.longValue() > 0L) {
                this.region.update((HRegion r) -> r.put(new Put(Bytes.toBytes(maxProcIdSet.longValue())).addColumn(MasterRegionFactory.PROC_FAMILY, PROC_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY)));
            }
        } else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
            LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
        }
        store.stop(false);
        if (!fs.delete(procWALDir, true)) {
            throw new IOException("Failed to delete the WALProcedureStore migrated proc wal directory " + procWALDir);
        }
        LOG.info("Migration of WALProcedureStore finished");
    }

    public void recoverLease() throws IOException {
        LOG.info("Starting Region Procedure Store lease recovery...");
        FileSystem fs = CommonFSUtils.getWALFileSystem(this.server.getConfiguration());
        this.tryMigrate(fs);
    }

    public void load(ProcedureStore.ProcedureLoader loader) throws IOException {
        ArrayList<ProcedureProtos.Procedure> procs = new ArrayList<ProcedureProtos.Procedure>();
        long maxProcId = 0L;
        try (RegionScanner scanner = this.region.getScanner(new Scan().addColumn(MasterRegionFactory.PROC_FAMILY, PROC_QUALIFIER));){
            boolean moreRows;
            ArrayList<Cell> cells = new ArrayList<Cell>();
            do {
                moreRows = scanner.next(cells);
                if (cells.isEmpty()) continue;
                Cell cell = (Cell)cells.get(0);
                cells.clear();
                maxProcId = Math.max(maxProcId, Bytes.toLong(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
                if (cell.getValueLength() <= 0) continue;
                ProcedureProtos.Procedure proto = ProcedureProtos.Procedure.parser().parseFrom(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                procs.add(proto);
            } while (moreRows);
        }
        loader.setMaxProcId(maxProcId);
        ProcedureTree tree = ProcedureTree.build(procs);
        loader.load(tree.getValidProcs());
        loader.handleCorrupted(tree.getCorruptedProcs());
    }

    private void serializePut(Procedure<?> proc, List<Mutation> mutations, List<byte[]> rowsToLock) throws IOException {
        ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
        byte[] row = Bytes.toBytes(proc.getProcId());
        mutations.add(new Put(row).addColumn(MasterRegionFactory.PROC_FAMILY, PROC_QUALIFIER, proto.toByteArray()));
        rowsToLock.add(row);
    }

    private void serializeDelete(long procId, List<Mutation> mutations, List<byte[]> rowsToLock) {
        byte[] row = Bytes.toBytes(procId);
        mutations.add(new Put(row).addColumn(MasterRegionFactory.PROC_FAMILY, PROC_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY));
        rowsToLock.add(row);
    }

    private void runWithoutRpcCall(Runnable runnable) {
        Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall();
        try {
            runnable.run();
        }
        finally {
            rpcCall.ifPresent(RpcServer::setCurrentCall);
        }
    }

    public void insert(Procedure<?> proc, Procedure<?>[] subProcs) {
        if (subProcs == null || subProcs.length == 0) {
            this.update(proc);
            return;
        }
        ArrayList mutations = new ArrayList(subProcs.length + 1);
        ArrayList rowsToLock = new ArrayList(subProcs.length + 1);
        this.runWithoutRpcCall(() -> {
            try {
                this.serializePut(proc, mutations, rowsToLock);
                for (Procedure subProc : subProcs) {
                    this.serializePut(subProc, mutations, rowsToLock);
                }
                this.region.update((HRegion r) -> r.mutateRowsWithLocks(mutations, rowsToLock, 0L, 0L));
            }
            catch (IOException e) {
                LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc, Arrays.toString(subProcs), e);
                throw new UncheckedIOException(e);
            }
        });
    }

    public void insert(Procedure<?>[] procs) {
        ArrayList mutations = new ArrayList(procs.length);
        ArrayList rowsToLock = new ArrayList(procs.length);
        this.runWithoutRpcCall(() -> {
            try {
                for (Procedure proc : procs) {
                    this.serializePut(proc, mutations, rowsToLock);
                }
                this.region.update((HRegion r) -> r.mutateRowsWithLocks(mutations, rowsToLock, 0L, 0L));
            }
            catch (IOException e) {
                LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", (Object)Arrays.toString(procs), (Object)e);
                throw new UncheckedIOException(e);
            }
        });
    }

    public void update(Procedure<?> proc) {
        this.runWithoutRpcCall(() -> {
            try {
                ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure((Procedure)proc);
                this.region.update((HRegion r) -> r.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(MasterRegionFactory.PROC_FAMILY, PROC_QUALIFIER, proto.toByteArray())));
            }
            catch (IOException e) {
                LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", (Object)proc, (Object)e);
                throw new UncheckedIOException(e);
            }
        });
    }

    public void delete(long procId) {
        try {
            this.region.update((HRegion r) -> r.put(new Put(Bytes.toBytes(procId)).addColumn(MasterRegionFactory.PROC_FAMILY, PROC_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY)));
        }
        catch (IOException e) {
            LOG.error(HBaseMarkers.FATAL, "Failed to delete pid={}", (Object)procId, (Object)e);
            throw new UncheckedIOException(e);
        }
    }

    public void delete(Procedure<?> parentProc, long[] subProcIds) {
        ArrayList<Mutation> mutations = new ArrayList<Mutation>(subProcIds.length + 1);
        ArrayList<byte[]> rowsToLock = new ArrayList<byte[]>(subProcIds.length + 1);
        try {
            this.serializePut(parentProc, mutations, rowsToLock);
            for (long subProcId : subProcIds) {
                this.serializeDelete(subProcId, mutations, rowsToLock);
            }
            this.region.update((HRegion r) -> r.mutateRowsWithLocks(mutations, rowsToLock, 0L, 0L));
        }
        catch (IOException e) {
            LOG.error(HBaseMarkers.FATAL, "Failed to delete parent proc {}, sub pids={}", parentProc, Arrays.toString(subProcIds), e);
            throw new UncheckedIOException(e);
        }
    }

    public void delete(long[] procIds, int offset, int count) {
        if (count == 0) {
            return;
        }
        if (count == 1) {
            this.delete(procIds[offset]);
            return;
        }
        ArrayList<Mutation> mutations = new ArrayList<Mutation>(count);
        ArrayList<byte[]> rowsToLock = new ArrayList<byte[]>(count);
        for (int i = 0; i < count; ++i) {
            long procId = procIds[offset + i];
            this.serializeDelete(procId, mutations, rowsToLock);
        }
        try {
            this.region.update((HRegion r) -> r.mutateRowsWithLocks(mutations, rowsToLock, 0L, 0L));
        }
        catch (IOException e) {
            LOG.error(HBaseMarkers.FATAL, "Failed to delete pids={}", (Object)Arrays.toString(procIds), (Object)e);
            throw new UncheckedIOException(e);
        }
    }

    public void cleanup() {
        ArrayList<Cell> cells = new ArrayList<Cell>();
        try (RegionScanner scanner = this.region.getScanner(new Scan().addColumn(MasterRegionFactory.PROC_FAMILY, PROC_QUALIFIER).setReversed(true));){
            boolean moreRows = scanner.next(cells);
            if (cells.isEmpty()) {
                return;
            }
            cells.clear();
            while (moreRows) {
                moreRows = scanner.next(cells);
                if (cells.isEmpty()) continue;
                Cell cell = (Cell)cells.get(0);
                cells.clear();
                if (cell.getValueLength() != 0) continue;
                this.region.update((HRegion r) -> r.delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())));
            }
        }
        catch (IOException e) {
            LOG.warn("Failed to clean up delete procedures", e);
        }
    }
}

