/*
 * Decompiled with CFR 0.152.
 */
package com.hotels.bdp.circustrain.core.replica;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Enums;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.hotels.bdp.circustrain.api.CircusTrainException;
import com.hotels.bdp.circustrain.api.CircusTrainTableParameter;
import com.hotels.bdp.circustrain.api.ReplicaLocationManager;
import com.hotels.bdp.circustrain.api.SourceLocationManager;
import com.hotels.bdp.circustrain.api.conf.ReplicaCatalog;
import com.hotels.bdp.circustrain.api.conf.ReplicationMode;
import com.hotels.bdp.circustrain.api.conf.TableReplication;
import com.hotels.bdp.circustrain.api.data.DataManipulator;
import com.hotels.bdp.circustrain.api.event.ReplicaCatalogListener;
import com.hotels.bdp.circustrain.api.listener.HousekeepingListener;
import com.hotels.bdp.circustrain.core.HiveEndpoint;
import com.hotels.bdp.circustrain.core.PartitionsAndStatistics;
import com.hotels.bdp.circustrain.core.TableAndStatistics;
import com.hotels.bdp.circustrain.core.event.EventUtils;
import com.hotels.bdp.circustrain.core.replica.CleanupLocationManager;
import com.hotels.bdp.circustrain.core.replica.CleanupLocationManagerFactory;
import com.hotels.bdp.circustrain.core.replica.DestinationNotReplicaException;
import com.hotels.bdp.circustrain.core.replica.FullReplicationReplicaLocationManager;
import com.hotels.bdp.circustrain.core.replica.InvalidReplicationModeException;
import com.hotels.bdp.circustrain.core.replica.ReplicaTableFactory;
import com.hotels.bdp.circustrain.core.replica.TableType;
import com.hotels.bdp.circustrain.core.replica.hive.AlterTableService;
import com.hotels.bdp.circustrain.core.replica.hive.DropTableService;
import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;
import com.hotels.hcommon.hive.metastore.exception.MetaStoreClientException;
import com.hotels.hcommon.hive.metastore.util.LocationUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Replica
extends HiveEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(Replica.class);
    private final ReplicaTableFactory tableFactory;
    private final HousekeepingListener housekeepingListener;
    private final ReplicaCatalogListener replicaCatalogListener;
    private final ReplicationMode replicationMode;
    private final TableReplication tableReplication;
    private final AlterTableService alterTableService;
    private int partitionBatchSize = 1000;

    Replica(ReplicaCatalog replicaCatalog, HiveConf replicaHiveConf, Supplier<CloseableMetaStoreClient> replicaMetaStoreClientSupplier, ReplicaTableFactory replicaTableFactory, HousekeepingListener housekeepingListener, ReplicaCatalogListener replicaCatalogListener, TableReplication tableReplication, AlterTableService alterTableService) {
        super(replicaCatalog.getName(), replicaHiveConf, replicaMetaStoreClientSupplier);
        this.replicaCatalogListener = replicaCatalogListener;
        this.tableFactory = replicaTableFactory;
        this.housekeepingListener = housekeepingListener;
        this.replicationMode = tableReplication.getReplicationMode();
        this.tableReplication = tableReplication;
        this.alterTableService = alterTableService;
    }

    @VisibleForTesting
    Replica(ReplicaCatalog replicaCatalog, HiveConf replicaHiveConf, Supplier<CloseableMetaStoreClient> replicaMetaStoreClientSupplier, ReplicaTableFactory replicaTableFactory, HousekeepingListener housekeepingListener, ReplicaCatalogListener replicaCatalogListener, TableReplication tableReplication, AlterTableService alterTableService, int partitionBatchSize) {
        super(replicaCatalog.getName(), replicaHiveConf, replicaMetaStoreClientSupplier);
        this.replicaCatalogListener = replicaCatalogListener;
        this.tableFactory = replicaTableFactory;
        this.housekeepingListener = housekeepingListener;
        this.replicationMode = tableReplication.getReplicationMode();
        this.tableReplication = tableReplication;
        this.partitionBatchSize = partitionBatchSize;
        this.alterTableService = alterTableService;
    }

    public void updateMetadata(String eventId, TableAndStatistics sourceTable, String replicaDatabaseName, String replicaTableName, ReplicaLocationManager locationManager) {
        try (CloseableMetaStoreClient client = (CloseableMetaStoreClient)this.getMetaStoreClientSupplier().get();){
            Optional<Table> oldReplicaTable = this.updateTableMetadata(client, eventId, sourceTable, replicaDatabaseName, replicaTableName, locationManager.getTableLocation(), this.replicationMode);
            if (oldReplicaTable.isPresent() && LocationUtils.hasLocation((Table)((Table)oldReplicaTable.get())) && this.isUnpartitioned((Table)oldReplicaTable.get())) {
                Path oldLocation = LocationUtils.locationAsPath((Table)((Table)oldReplicaTable.get()));
                String oldEventId = (String)((Table)oldReplicaTable.get()).getParameters().get(CircusTrainTableParameter.REPLICATION_EVENT.parameterName());
                locationManager.addCleanUpLocation(oldEventId, oldLocation);
            }
        }
    }

    private boolean isUnpartitioned(Table table) {
        return table.getPartitionKeysSize() == 0;
    }

    public void updateMetadata(String eventId, TableAndStatistics sourceTableAndStatistics, PartitionsAndStatistics sourcePartitionsAndStatistics, String replicaDatabaseName, String replicaTableName, ReplicaLocationManager locationManager) {
        block28: {
            try (CloseableMetaStoreClient client = (CloseableMetaStoreClient)this.getMetaStoreClientSupplier().get();){
                this.updateTableMetadata(client, eventId, sourceTableAndStatistics, replicaDatabaseName, replicaTableName, locationManager.getTableLocation(), this.replicationMode);
                List<Partition> oldPartitions = this.getOldPartitions(sourcePartitionsAndStatistics, replicaDatabaseName, replicaTableName, client);
                LOG.debug("Found {} existing partitions that may match.", (Object)oldPartitions.size());
                this.replicaCatalogListener.existingReplicaPartitions(EventUtils.toEventPartitions(sourceTableAndStatistics.getTable(), oldPartitions));
                Map<List<String>, Partition> oldPartitionsByKey = this.mapPartitionsByKey(oldPartitions);
                List<Partition> sourcePartitions = sourcePartitionsAndStatistics.getPartitions();
                ArrayList<Partition> partitionsToCreate = new ArrayList<Partition>(sourcePartitions.size());
                ArrayList<Partition> partitionsToAlter = new ArrayList<Partition>(sourcePartitions.size());
                ArrayList<ColumnStatistics> statisticsToSet = new ArrayList<ColumnStatistics>(sourcePartitions.size());
                for (Partition sourcePartition : sourcePartitions) {
                    ColumnStatistics sourcePartitionStatistics;
                    Path replicaPartitionLocation = locationManager.getPartitionLocation(sourcePartition);
                    LOG.debug("Generated replica partition path: {}", (Object)replicaPartitionLocation);
                    Partition replicaPartition = this.tableFactory.newReplicaPartition(eventId, sourceTableAndStatistics.getTable(), sourcePartition, replicaDatabaseName, replicaTableName, replicaPartitionLocation, this.replicationMode);
                    Partition oldPartition = oldPartitionsByKey.get(sourcePartition.getValues());
                    if (oldPartition == null) {
                        partitionsToCreate.add(replicaPartition);
                    } else {
                        partitionsToAlter.add(replicaPartition);
                        if (LocationUtils.hasLocation((Partition)oldPartition)) {
                            Path oldLocation = LocationUtils.locationAsPath((Partition)oldPartition);
                            String oldEventId = (String)oldPartition.getParameters().get(CircusTrainTableParameter.REPLICATION_EVENT.parameterName());
                            locationManager.addCleanUpLocation(oldEventId, oldLocation);
                        }
                    }
                    if ((sourcePartitionStatistics = sourcePartitionsAndStatistics.getStatisticsForPartition(sourcePartition)) == null) continue;
                    statisticsToSet.add(this.tableFactory.newReplicaPartitionStatistics(sourceTableAndStatistics.getTable(), replicaPartition, sourcePartitionStatistics));
                }
                this.replicaCatalogListener.partitionsToAlter(EventUtils.toEventPartitions(sourceTableAndStatistics.getTable(), partitionsToAlter));
                this.replicaCatalogListener.partitionsToCreate(EventUtils.toEventPartitions(sourceTableAndStatistics.getTable(), partitionsToCreate));
                if (!partitionsToCreate.isEmpty()) {
                    LOG.info("Creating {} new partitions.", (Object)partitionsToCreate.size());
                    try {
                        int counter = 0;
                        for (List sublist : Lists.partition(partitionsToCreate, (int)this.partitionBatchSize)) {
                            int start = counter * this.partitionBatchSize;
                            LOG.info("Creating partitions {} through {}", (Object)start, (Object)(start + sublist.size() - 1));
                            client.add_partitions(sublist);
                            ++counter;
                        }
                    }
                    catch (TException e) {
                        throw new MetaStoreClientException("Unable to add partitions '" + partitionsToCreate + "' to replica table '" + replicaDatabaseName + "." + replicaTableName + "'", (Throwable)e);
                    }
                }
                if (!partitionsToAlter.isEmpty()) {
                    LOG.info("Altering {} existing partitions.", (Object)partitionsToAlter.size());
                    try {
                        int counter = 0;
                        for (List sublist : Lists.partition(partitionsToAlter, (int)this.partitionBatchSize)) {
                            int start = counter * this.partitionBatchSize;
                            LOG.info("Altering partitions {} through {}", (Object)start, (Object)(start + sublist.size() - 1));
                            client.alter_partitions(replicaDatabaseName, replicaTableName, sublist);
                            ++counter;
                        }
                    }
                    catch (TException e) {
                        throw new MetaStoreClientException("Unable to alter partitions '" + partitionsToAlter + "' of replica table '" + replicaDatabaseName + "." + replicaTableName + "'", (Throwable)e);
                    }
                }
                if (!statisticsToSet.isEmpty()) {
                    LOG.info("Setting column statistics for {} partitions.", (Object)statisticsToSet.size());
                    try {
                        int counter = 0;
                        for (List sublist : Lists.partition(statisticsToSet, (int)this.partitionBatchSize)) {
                            int start = counter * this.partitionBatchSize;
                            LOG.info("Setting column statistics for partitions {} through {}", (Object)start, (Object)(start + sublist.size() - 1));
                            client.setPartitionColumnStatistics(new SetPartitionsStatsRequest(sublist));
                            ++counter;
                        }
                        break block28;
                    }
                    catch (TException e) {
                        throw new MetaStoreClientException("Unable to set column statistics of replica table '" + replicaDatabaseName + "." + replicaTableName + "'", (Throwable)e);
                    }
                }
                LOG.debug("No partition column stats to set.");
            }
        }
    }

    private List<Partition> getOldPartitions(PartitionsAndStatistics sourcePartitionsAndStatistics, String replicaDatabaseName, String replicaTableName, CloseableMetaStoreClient client) {
        List<String> partitionValues = sourcePartitionsAndStatistics.getPartitionNames();
        try {
            return client.getPartitionsByNames(replicaDatabaseName, replicaTableName, partitionValues);
        }
        catch (TException e) {
            throw new MetaStoreClientException("Unable to list current partitions of replica table '" + replicaDatabaseName + "." + replicaTableName + "' with partition values '" + partitionValues + "'", (Throwable)e);
        }
    }

    private Map<List<String>, Partition> mapPartitionsByKey(List<Partition> partitions) {
        HashMap<List<String>, Partition> partitionsByKey = new HashMap<List<String>, Partition>(partitions.size());
        for (Partition partition : partitions) {
            partitionsByKey.put(partition.getValues(), partition);
        }
        return partitionsByKey;
    }

    private Optional<Table> updateTableMetadata(CloseableMetaStoreClient client, String eventId, TableAndStatistics sourceTable, String replicaDatabaseName, String replicaTableName, Path tableLocation, ReplicationMode replicationMode) {
        LOG.info("Updating replica table metadata.");
        TableAndStatistics replicaTable = this.tableFactory.newReplicaTable(eventId, sourceTable, replicaDatabaseName, replicaTableName, tableLocation, replicationMode);
        Optional<Table> oldReplicaTable = this.getTable(client, replicaDatabaseName, replicaTableName);
        if (!oldReplicaTable.isPresent()) {
            LOG.debug("No existing replica table found, creating.");
            try {
                client.createTable(replicaTable.getTable());
                this.updateTableColumnStatistics(client, replicaTable);
            }
            catch (TException e) {
                throw new MetaStoreClientException("Unable to create replica table '" + replicaDatabaseName + "." + replicaTableName + "'", (Throwable)e);
            }
        }
        this.makeSureCanReplicate((Table)oldReplicaTable.get(), replicaTable.getTable());
        LOG.debug("Existing replica table found, altering.");
        try {
            this.alterTableService.alterTable(client, (Table)oldReplicaTable.get(), replicaTable.getTable());
            this.updateTableColumnStatistics(client, replicaTable);
        }
        catch (Exception e) {
            throw new MetaStoreClientException("Unable to alter replica table '" + replicaDatabaseName + "." + replicaTableName + "'", (Throwable)e);
        }
        return oldReplicaTable;
    }

    private void makeSureCanReplicate(Table oldReplicaTable, Table replicaTable) {
        if (!Objects.equals(oldReplicaTable.getTableType(), replicaTable.getTableType())) {
            String message = String.format("Unable to replace %s %s.%s with %s %s.%s", oldReplicaTable.getTableType(), oldReplicaTable.getDbName(), oldReplicaTable.getTableName(), replicaTable.getTableType(), replicaTable.getDbName(), replicaTable.getTableName());
            throw new CircusTrainException(message);
        }
    }

    public void validateReplicaTable(String replicaDatabaseName, String replicaTableName) {
        try (CloseableMetaStoreClient client = (CloseableMetaStoreClient)this.getMetaStoreClientSupplier().get();){
            Optional<Table> oldReplicaTable = this.getTable(client, replicaDatabaseName, replicaTableName);
            if (oldReplicaTable.isPresent()) {
                LOG.debug("Existing table found, checking that it is a valid replica.");
                this.determineValidityOfReplica(this.replicationMode, (Table)oldReplicaTable.get());
            }
        }
    }

    private void determineValidityOfReplica(ReplicationMode replicationMode, Table oldReplicaTable) {
        String previousEvent = (String)oldReplicaTable.getParameters().get(CircusTrainTableParameter.REPLICATION_EVENT.parameterName());
        if (StringUtils.isBlank((CharSequence)previousEvent)) {
            throw new DestinationNotReplicaException(oldReplicaTable, this.getHiveConf().getVar(HiveConf.ConfVars.METASTOREURIS), CircusTrainTableParameter.REPLICATION_EVENT);
        }
        LOG.debug("Checking that replication modes are compatible.");
        Optional replicaReplicationMode = Enums.getIfPresent(ReplicationMode.class, (String)Strings.nullToEmpty((String)((String)oldReplicaTable.getParameters().get(CircusTrainTableParameter.REPLICATION_MODE.parameterName()))));
        if (replicaReplicationMode.isPresent()) {
            if (replicaReplicationMode.get() == ReplicationMode.METADATA_MIRROR && replicationMode != ReplicationMode.METADATA_MIRROR) {
                throw new InvalidReplicationModeException("Trying a " + replicationMode.name() + " replication on a table that was previously only " + ReplicationMode.METADATA_MIRROR.name() + "-ed. This is not possible, rerun with a different table name or change the replication mode to " + ReplicationMode.METADATA_MIRROR.name() + ".");
            }
            if (replicaReplicationMode.get() != ReplicationMode.METADATA_MIRROR && replicationMode == ReplicationMode.METADATA_MIRROR) {
                throw new InvalidReplicationModeException("Trying to " + ReplicationMode.METADATA_MIRROR.name() + " a previously replicated table. This is not possible, rerun with a different table name or change the replication mode to " + ReplicationMode.FULL.name() + ", " + ReplicationMode.FULL_OVERWRITE.name() + ", or " + ReplicationMode.METADATA_UPDATE.name() + ".");
            }
        } else if (replicationMode == ReplicationMode.METADATA_MIRROR) {
            throw new InvalidReplicationModeException("Trying to " + ReplicationMode.METADATA_MIRROR.name() + " a previously replicated table. This is not possible, rerun with a different table name or change the replication mode to " + ReplicationMode.FULL.name() + ", " + ReplicationMode.FULL_OVERWRITE.name() + ", or " + ReplicationMode.METADATA_UPDATE.name() + ".");
        }
        LOG.debug("Replication modes are compatible.");
    }

    private void updateTableColumnStatistics(CloseableMetaStoreClient client, TableAndStatistics replicaTable) throws TException {
        if (replicaTable.getStatistics() != null) {
            LOG.debug("Updating {} column statistics for table {}.{}", new Object[]{replicaTable.getStatistics().getStatsObj().size(), replicaTable.getTable().getDbName(), replicaTable.getTable().getTableName()});
            client.updateTableColumnStatistics(replicaTable.getStatistics());
        } else {
            LOG.debug("No column statistics to update for table {}.{}", (Object)replicaTable.getTable().getDbName(), (Object)replicaTable.getTable().getTableName());
        }
    }

    public ReplicaLocationManager getLocationManager(TableType tableType, String targetTableLocation, String eventId, SourceLocationManager sourceLocationManager) {
        CleanupLocationManager cleanupLocationManager = CleanupLocationManagerFactory.newInstance(eventId, this.housekeepingListener, this.replicaCatalogListener, this.tableReplication);
        return new FullReplicationReplicaLocationManager(sourceLocationManager, targetTableLocation, eventId, tableType, cleanupLocationManager, this.replicaCatalogListener);
    }

    @Override
    public TableAndStatistics getTableAndStatistics(TableReplication tableReplication) {
        return super.getTableAndStatistics(tableReplication.getReplicaDatabaseName(), tableReplication.getReplicaTableName());
    }

    public void cleanupReplicaTableIfRequired(String replicaDatabaseName, String replicaTableName, DataManipulator dataManipulator) throws Exception {
        if (this.replicationMode == ReplicationMode.FULL_OVERWRITE) {
            LOG.debug("Replication mode: FULL_OVERWRITE. Checking for existing replica table.");
            try (CloseableMetaStoreClient client = (CloseableMetaStoreClient)this.getMetaStoreClientSupplier().get();){
                DropTableService dropTableService = new DropTableService();
                dropTableService.dropTableAndData(client, replicaDatabaseName, replicaTableName, dataManipulator);
            }
        }
    }
}

