/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.replication.regionserver.DefaultSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.replication.regionserver.HFileReplicator;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSink;
import org.apache.hadoop.hbase.replication.regionserver.SourceFSConfigurationProvider;
import org.apache.hadoop.hbase.replication.regionserver.WALEntrySinkFilter;
import org.apache.hadoop.hbase.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hbase.thirdparty.com.google.protobuf.ProtocolStringList;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class ReplicationSink {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class);
    private final Configuration conf;
    private volatile Connection sharedHtableCon;
    private final MetricsSink metrics;
    private final AtomicLong totalReplicatedEdits = new AtomicLong();
    private final Object sharedHtableConLock = new Object();
    private long hfilesReplicated = 0L;
    private SourceFSConfigurationProvider provider;
    private WALEntrySinkFilter walEntrySinkFilter;

    public ReplicationSink(Configuration conf, Stoppable stopper) throws IOException {
        this.conf = HBaseConfiguration.create(conf);
        this.decorateConf();
        this.metrics = new MetricsSink();
        this.walEntrySinkFilter = this.setupWALEntrySinkFilter();
        String className = conf.get("hbase.replication.source.fs.conf.provider", DefaultSourceFSConfigurationProvider.class.getCanonicalName());
        try {
            Class<?> c = Class.forName(className);
            this.provider = (SourceFSConfigurationProvider)c.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Configured source fs configuration provider class " + className + " throws error.", e);
        }
    }

    private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException {
        Class walEntryFilterClass = this.conf.getClass("hbase.replication.sink.walentrysinkfilter", null);
        WALEntrySinkFilter filter = null;
        try {
            filter = walEntryFilterClass == null ? null : (WALEntrySinkFilter)walEntryFilterClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        }
        catch (Exception e) {
            LOG.warn("Failed to instantiate " + walEntryFilterClass);
        }
        if (filter != null) {
            filter.init(this.getConnection());
        }
        return filter;
    }

    private void decorateConf() {
        this.conf.setInt("hbase.client.retries.number", this.conf.getInt("replication.sink.client.retries.number", 4));
        this.conf.setInt("hbase.client.operation.timeout", this.conf.getInt("replication.sink.client.ops.timeout", 10000));
        String replicationCodec = this.conf.get("hbase.replication.rpc.codec");
        if (StringUtils.isNotEmpty(replicationCodec)) {
            this.conf.set("hbase.client.rpc.codec", replicationCodec);
        }
        if (this.conf.get("hbase.client.zookeeper.quorum") != null) {
            this.conf.unset("hbase.client.zookeeper.quorum");
        }
    }

    public void replicateEntries(List<AdminProtos.WALEntry> entries, CellScanner cells, String replicationClusterId, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) throws IOException {
        if (entries.isEmpty()) {
            return;
        }
        try {
            long totalReplicated = 0L;
            TreeMap rowMap = new TreeMap();
            HashMap<ProtocolStringList, HashMap<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
            for (AdminProtos.WALEntry wALEntry : entries) {
                TableName table = TableName.valueOf(wALEntry.getKey().getTableName().toByteArray());
                if (this.walEntrySinkFilter != null && this.walEntrySinkFilter.filter(table, wALEntry.getKey().getWriteTime())) {
                    int count = wALEntry.getAssociatedCellCount();
                    for (int i = 0; i < count; ++i) {
                        if (cells.advance()) continue;
                        throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
                    }
                    continue;
                }
                Cell previousCell = null;
                Mutation mutation = null;
                int count = wALEntry.getAssociatedCellCount();
                for (int i = 0; i < count; ++i) {
                    if (!cells.advance()) {
                        throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
                    }
                    Cell cell = cells.current();
                    if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
                        HashMap<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap;
                        WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
                        if (!bld.getReplicate()) continue;
                        if (bulkLoadsPerClusters == null) {
                            bulkLoadsPerClusters = new HashMap<ProtocolStringList, HashMap<String, List<Pair<byte[], List<String>>>>>();
                        }
                        if ((bulkLoadHFileMap = (HashMap<String, List<Pair<byte[], List<String>>>>)bulkLoadsPerClusters.get(bld.getClusterIdsList())) == null) {
                            bulkLoadHFileMap = new HashMap<String, List<Pair<byte[], List<String>>>>();
                            bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
                        }
                        this.buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
                        continue;
                    }
                    if (this.isNewRowOrType(previousCell, cell)) {
                        mutation = CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
                        ArrayList<UUID> clusterIds = new ArrayList<UUID>(wALEntry.getKey().getClusterIdsList().size());
                        for (HBaseProtos.UUID clusterId : wALEntry.getKey().getClusterIdsList()) {
                            clusterIds.add(this.toUUID(clusterId));
                        }
                        mutation.setClusterIds(clusterIds);
                        this.addToHashMultiMap(rowMap, table, clusterIds, mutation);
                    }
                    if (CellUtil.isDelete(cell)) {
                        ((Delete)mutation).add(cell);
                    } else {
                        ((Put)mutation).add(cell);
                    }
                    previousCell = cell;
                }
                ++totalReplicated;
            }
            if (!rowMap.isEmpty()) {
                LOG.debug("Started replicating mutations.");
                for (Map.Entry entry : rowMap.entrySet()) {
                    this.batch((TableName)entry.getKey(), ((Map)entry.getValue()).values());
                }
                LOG.debug("Finished replicating mutations.");
            }
            if (bulkLoadsPerClusters != null) {
                for (Map.Entry entry : bulkLoadsPerClusters.entrySet()) {
                    Map bulkLoadHFileMap = (Map)entry.getValue();
                    if (bulkLoadHFileMap == null || bulkLoadHFileMap.isEmpty()) continue;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Started replicating bulk loaded data from cluster ids: {}.", (Object)((List)entry.getKey()).toString());
                    }
                    HFileReplicator hFileReplicator = new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId), sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, this.conf, this.getConnection(), (List)entry.getKey());
                    hFileReplicator.replicate();
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug("Finished replicating bulk loaded data from cluster id: {}", (Object)((List)entry.getKey()).toString());
                }
            }
            int size = entries.size();
            this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
            this.metrics.applyBatch((long)size + this.hfilesReplicated, this.hfilesReplicated);
            this.totalReplicatedEdits.addAndGet(totalReplicated);
        }
        catch (IOException ex) {
            LOG.error("Unable to accept edit because:", (Throwable)ex);
            throw ex;
        }
    }

    private void buildBulkLoadHFileMap(Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table, WALProtos.BulkLoadDescriptor bld) throws IOException {
        List<WALProtos.StoreDescriptor> storesList = bld.getStoresList();
        int storesSize = storesList.size();
        for (int j = 0; j < storesSize; ++j) {
            WALProtos.StoreDescriptor storeDescriptor = storesList.get(j);
            ProtocolStringList storeFileList = storeDescriptor.getStoreFileList();
            int storeFilesSize = storeFileList.size();
            this.hfilesReplicated += (long)storeFilesSize;
            for (int k = 0; k < storeFilesSize; ++k) {
                byte[] family = storeDescriptor.getFamilyName().toByteArray();
                String pathToHfileFromNS = this.getHFilePath(table, bld, (String)storeFileList.get(k), family);
                String tableName = table.getNameWithNamespaceInclAsString();
                List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName);
                if (familyHFilePathsList != null) {
                    boolean foundFamily = false;
                    for (Pair<byte[], List<String>> familyHFilePathsPair : familyHFilePathsList) {
                        if (!Bytes.equals(familyHFilePathsPair.getFirst(), family)) continue;
                        familyHFilePathsPair.getSecond().add(pathToHfileFromNS);
                        foundFamily = true;
                        break;
                    }
                    if (foundFamily) continue;
                    this.addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList);
                    continue;
                }
                this.addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName);
            }
        }
    }

    private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS, List<Pair<byte[], List<String>>> familyHFilePathsList) {
        ArrayList<String> hfilePaths = new ArrayList<String>(1);
        hfilePaths.add(pathToHfileFromNS);
        familyHFilePathsList.add(new Pair(family, hfilePaths));
    }

    private void addNewTableEntryInMap(Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family, String pathToHfileFromNS, String tableName) {
        ArrayList<String> hfilePaths = new ArrayList<String>(1);
        hfilePaths.add(pathToHfileFromNS);
        Pair newFamilyHFilePathsPair = new Pair(family, hfilePaths);
        ArrayList newFamilyHFilePathsList = new ArrayList();
        newFamilyHFilePathsList.add(newFamilyHFilePathsPair);
        bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList);
    }

    private String getHFilePath(TableName table, WALProtos.BulkLoadDescriptor bld, String storeFile, byte[] family) {
        return new StringBuilder(100).append(table.getNamespaceAsString()).append("/").append(table.getQualifierAsString()).append("/").append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append("/").append(Bytes.toString(family)).append("/").append(storeFile).toString();
    }

    private boolean isNewRowOrType(Cell previousCell, Cell cell) {
        return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() || !CellUtil.matchingRows(previousCell, cell);
    }

    private UUID toUUID(HBaseProtos.UUID uuid) {
        return new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
    }

    private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2, V value) {
        List<V> values;
        Map<K2, List<List<V>>> innerMap = map.get(key1);
        if (innerMap == null) {
            innerMap = new HashMap<K2, List<V>>();
            map.put(key1, innerMap);
        }
        if ((values = innerMap.get(key2)) == null) {
            values = new ArrayList<V>();
            innerMap.put(key2, values);
        }
        values.add(value);
        return values;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopReplicationSinkServices() {
        block6: {
            try {
                if (this.sharedHtableCon == null) break block6;
                Object object = this.sharedHtableConLock;
                synchronized (object) {
                    if (this.sharedHtableCon != null) {
                        this.sharedHtableCon.close();
                        this.sharedHtableCon = null;
                    }
                }
            }
            catch (IOException e) {
                LOG.warn("IOException while closing the connection", (Throwable)e);
            }
        }
    }

    protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
        if (allRows.isEmpty()) {
            return;
        }
        try (Table table = null;){
            Connection connection = this.getConnection();
            table = connection.getTable(tableName);
            for (List<Row> rows : allRows) {
                table.batch(rows, null);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Connection getConnection() throws IOException {
        Connection connection = this.sharedHtableCon;
        if (connection == null) {
            Object object = this.sharedHtableConLock;
            synchronized (object) {
                connection = this.sharedHtableCon;
                if (connection == null) {
                    connection = this.sharedHtableCon = ConnectionFactory.createConnection(this.conf);
                }
            }
        }
        return connection;
    }

    public String getStats() {
        return this.totalReplicatedEdits.get() == 0L ? "" : "Sink: age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + ", total replicated edits: " + this.totalReplicatedEdits;
    }

    public MetricsSink getSinkMetrics() {
        return this.metrics;
    }
}

