/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sedona.core.spatialOperator;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.sedona.core.enums.IndexType;
import org.apache.sedona.core.enums.JoinBuildSide;
import org.apache.sedona.core.geometryObjects.Circle;
import org.apache.sedona.core.joinJudgement.DedupParams;
import org.apache.sedona.core.joinJudgement.DynamicIndexLookupJudgement;
import org.apache.sedona.core.joinJudgement.LeftIndexLookupJudgement;
import org.apache.sedona.core.joinJudgement.NestedLoopJudgement;
import org.apache.sedona.core.joinJudgement.RightIndexLookupJudgement;
import org.apache.sedona.core.monitoring.Metric;
import org.apache.sedona.core.monitoring.Metrics;
import org.apache.sedona.core.spatialPartitioning.SpatialPartitioner;
import org.apache.sedona.core.spatialRDD.CircleRDD;
import org.apache.sedona.core.spatialRDD.SpatialRDD;
import org.apache.sedona.core.utils.GeomUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.locationtech.jts.geom.Geometry;
import scala.Tuple2;

public class JoinQuery {
    private static final Logger log = LogManager.getLogger(JoinQuery.class);

    private static <U extends Geometry, T extends Geometry> void verifyCRSMatch(SpatialRDD<T> spatialRDD, SpatialRDD<U> queryRDD) throws Exception {
        if (spatialRDD.getCRStransformation() != queryRDD.getCRStransformation()) {
            throw new IllegalArgumentException("[JoinQuery] input RDD doesn't perform necessary CRS transformation. Please check your RDD constructors.");
        }
        if (spatialRDD.getCRStransformation() && queryRDD.getCRStransformation() && !spatialRDD.getTargetEpgsgCode().equalsIgnoreCase(queryRDD.getTargetEpgsgCode())) {
            throw new IllegalArgumentException("[JoinQuery] the EPSG codes of two input RDDs are different. Please check your RDD constructors.");
        }
    }

    private static <U extends Geometry, T extends Geometry> void verifyPartitioningMatch(SpatialRDD<T> spatialRDD, SpatialRDD<U> queryRDD) throws Exception {
        int queryNumPart;
        Objects.requireNonNull(spatialRDD.spatialPartitionedRDD, "[JoinQuery] spatialRDD SpatialPartitionedRDD is null. Please do spatial partitioning.");
        Objects.requireNonNull(queryRDD.spatialPartitionedRDD, "[JoinQuery] queryRDD SpatialPartitionedRDD is null. Please use the spatialRDD's grids to do spatial partitioning.");
        SpatialPartitioner spatialPartitioner = spatialRDD.getPartitioner();
        SpatialPartitioner queryPartitioner = queryRDD.getPartitioner();
        if (!queryPartitioner.equals(spatialPartitioner)) {
            throw new IllegalArgumentException("[JoinQuery] queryRDD is not partitioned by the same grids with spatialRDD. Please make sure they both use the same grids otherwise wrong results will appear.");
        }
        int spatialNumPart = spatialRDD.spatialPartitionedRDD.getNumPartitions();
        if (spatialNumPart != (queryNumPart = queryRDD.spatialPartitionedRDD.getNumPartitions())) {
            throw new IllegalArgumentException("[JoinQuery] numbers of partitions in queryRDD and spatialRDD don't match: " + queryNumPart + " vs. " + spatialNumPart + ". Please make sure they both use the same partitioning otherwise wrong results will appear.");
        }
    }

    private static <U extends Geometry, T extends Geometry> JavaPairRDD<U, List<T>> collectGeometriesByKey(JavaPairRDD<U, T> input) {
        return input.groupBy((Function & Serializable)t -> GeomUtils.hashCode((Geometry)t._1)).values().mapToPair((PairFunction & Serializable)t -> {
            ArrayList values = new ArrayList();
            Iterator it = t.iterator();
            Tuple2 firstTpl = (Tuple2)it.next();
            Geometry key = (Geometry)firstTpl._1;
            values.add(firstTpl._2);
            while (it.hasNext()) {
                values.add(((Tuple2)it.next())._2);
            }
            return new Tuple2(key, values);
        });
    }

    private static <U extends Geometry, T extends Geometry> JavaPairRDD<U, Long> countGeometriesByKey(JavaPairRDD<U, T> input) {
        return input.aggregateByKey((Object)0L, new Function2<Long, T, Long>(){

            public Long call(Long count, T t) throws Exception {
                return count + 1L;
            }
        }, (Function2)new Function2<Long, Long, Long>(){

            public Long call(Long count1, Long count2) throws Exception {
                return count1 + count2;
            }
        });
    }

    public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, List<T>> SpatialJoinQuery(SpatialRDD<T> spatialRDD, SpatialRDD<U> queryRDD, boolean useIndex, boolean considerBoundaryIntersection) throws Exception {
        JoinParams joinParams = new JoinParams(useIndex, considerBoundaryIntersection);
        JavaPairRDD<U, T> joinResults = JoinQuery.spatialJoin(queryRDD, spatialRDD, joinParams);
        return JoinQuery.collectGeometriesByKey(joinResults);
    }

    public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, List<T>> SpatialJoinQuery(SpatialRDD<T> spatialRDD, SpatialRDD<U> queryRDD, JoinParams joinParams) throws Exception {
        JavaPairRDD<U, T> joinResults = JoinQuery.spatialJoin(queryRDD, spatialRDD, joinParams);
        return JoinQuery.collectGeometriesByKey(joinResults);
    }

    public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, T> SpatialJoinQueryFlat(SpatialRDD<T> spatialRDD, SpatialRDD<U> queryRDD, boolean useIndex, boolean considerBoundaryIntersection) throws Exception {
        JoinParams params = new JoinParams(useIndex, considerBoundaryIntersection);
        return JoinQuery.spatialJoin(queryRDD, spatialRDD, params);
    }

    public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, T> SpatialJoinQueryFlat(SpatialRDD<T> spatialRDD, SpatialRDD<U> queryRDD, JoinParams joinParams) throws Exception {
        return JoinQuery.spatialJoin(queryRDD, spatialRDD, joinParams);
    }

    public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, Long> SpatialJoinQueryCountByKey(SpatialRDD<T> spatialRDD, SpatialRDD<U> queryRDD, boolean useIndex, boolean considerBoundaryIntersection) throws Exception {
        JoinParams joinParams = new JoinParams(useIndex, considerBoundaryIntersection);
        JavaPairRDD<U, T> joinResults = JoinQuery.spatialJoin(queryRDD, spatialRDD, joinParams);
        return JoinQuery.countGeometriesByKey(joinResults);
    }

    public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, Long> SpatialJoinQueryCountByKey(SpatialRDD<T> spatialRDD, SpatialRDD<U> queryRDD, JoinParams joinParams) throws Exception {
        JavaPairRDD<U, T> joinResults = JoinQuery.spatialJoin(queryRDD, spatialRDD, joinParams);
        return JoinQuery.countGeometriesByKey(joinResults);
    }

    public static <T extends Geometry> JavaPairRDD<Geometry, T> DistanceJoinQueryFlat(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, boolean useIndex, boolean considerBoundaryIntersection) throws Exception {
        JoinParams joinParams = new JoinParams(useIndex, considerBoundaryIntersection);
        return JoinQuery.distanceJoin(spatialRDD, queryRDD, joinParams);
    }

    public static <T extends Geometry> JavaPairRDD<Geometry, T> DistanceJoinQueryFlat(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
        return JoinQuery.distanceJoin(spatialRDD, queryRDD, joinParams);
    }

    public static <T extends Geometry> JavaPairRDD<Geometry, List<T>> DistanceJoinQuery(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, boolean useIndex, boolean considerBoundaryIntersection) throws Exception {
        JoinParams joinParams = new JoinParams(useIndex, considerBoundaryIntersection);
        JavaPairRDD<Geometry, T> joinResults = JoinQuery.distanceJoin(spatialRDD, queryRDD, joinParams);
        return JoinQuery.collectGeometriesByKey(joinResults);
    }

    public static <T extends Geometry> JavaPairRDD<Geometry, List<T>> DistanceJoinQuery(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
        JavaPairRDD<Geometry, T> joinResults = JoinQuery.distanceJoin(spatialRDD, queryRDD, joinParams);
        return JoinQuery.collectGeometriesByKey(joinResults);
    }

    public static <T extends Geometry> JavaPairRDD<Geometry, Long> DistanceJoinQueryCountByKey(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, boolean useIndex, boolean considerBoundaryIntersection) throws Exception {
        JoinParams joinParams = new JoinParams(useIndex, considerBoundaryIntersection);
        JavaPairRDD<Geometry, T> joinResults = JoinQuery.distanceJoin(spatialRDD, queryRDD, joinParams);
        return JoinQuery.countGeometriesByKey(joinResults);
    }

    public static <T extends Geometry> JavaPairRDD<Geometry, Long> DistanceJoinQueryCountByKey(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
        JavaPairRDD<Geometry, T> joinResults = JoinQuery.distanceJoin(spatialRDD, queryRDD, joinParams);
        return JoinQuery.countGeometriesByKey(joinResults);
    }

    public static <T extends Geometry> JavaPairRDD<Geometry, T> distanceJoin(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
        JavaPairRDD<Circle, T> joinResults = JoinQuery.spatialJoin(queryRDD, spatialRDD, joinParams);
        return joinResults.mapToPair(new PairFunction<Tuple2<Circle, T>, Geometry, T>(){

            public Tuple2<Geometry, T> call(Tuple2<Circle, T> circleTTuple2) throws Exception {
                return new Tuple2(circleTTuple2._1().getCenterGeometry(), circleTTuple2._2());
            }
        });
    }

    public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, T> spatialJoin(SpatialRDD<U> leftRDD, SpatialRDD<T> rightRDD, JoinParams joinParams) throws Exception {
        JavaRDD joinResult;
        JoinQuery.verifyCRSMatch(leftRDD, rightRDD);
        JoinQuery.verifyPartitioningMatch(leftRDD, rightRDD);
        SparkContext sparkContext = leftRDD.spatialPartitionedRDD.context();
        Metric buildCount = Metrics.createMetric(sparkContext, "buildCount");
        Metric streamCount = Metrics.createMetric(sparkContext, "streamCount");
        Metric resultCount = Metrics.createMetric(sparkContext, "resultCount");
        Metric candidateCount = Metrics.createMetric(sparkContext, "candidateCount");
        SpatialPartitioner partitioner = (SpatialPartitioner)rightRDD.spatialPartitionedRDD.partitioner().get();
        DedupParams dedupParams = partitioner.getDedupParams();
        SparkContext cxt = leftRDD.rawSpatialRDD.context();
        if (joinParams.useIndex) {
            if (rightRDD.indexedRDD != null) {
                RightIndexLookupJudgement judgement = new RightIndexLookupJudgement(joinParams.considerBoundaryIntersection, dedupParams);
                judgement.broadcastDedupParams(cxt);
                joinResult = leftRDD.spatialPartitionedRDD.zipPartitions(rightRDD.indexedRDD, judgement);
            } else if (leftRDD.indexedRDD != null) {
                LeftIndexLookupJudgement judgement = new LeftIndexLookupJudgement(joinParams.considerBoundaryIntersection, dedupParams);
                judgement.broadcastDedupParams(cxt);
                joinResult = leftRDD.indexedRDD.zipPartitions(rightRDD.spatialPartitionedRDD, judgement);
            } else {
                log.warn((Object)"UseIndex is true, but no index exists. Will build index on the fly.");
                DynamicIndexLookupJudgement judgement = new DynamicIndexLookupJudgement(joinParams.considerBoundaryIntersection, joinParams.indexType, joinParams.joinBuildSide, dedupParams, buildCount, streamCount, resultCount, candidateCount);
                judgement.broadcastDedupParams(cxt);
                joinResult = leftRDD.spatialPartitionedRDD.zipPartitions(rightRDD.spatialPartitionedRDD, judgement);
            }
        } else {
            NestedLoopJudgement judgement = new NestedLoopJudgement(joinParams.considerBoundaryIntersection, dedupParams);
            judgement.broadcastDedupParams(cxt);
            joinResult = rightRDD.spatialPartitionedRDD.zipPartitions(leftRDD.spatialPartitionedRDD, judgement);
        }
        return joinResult.mapToPair(new PairFunction<Pair<U, T>, U, T>(){

            public Tuple2<U, T> call(Pair<U, T> pair) throws Exception {
                return new Tuple2<Object, Object>(pair.getKey(), pair.getValue());
            }
        });
    }

    public static final class JoinParams {
        public final boolean useIndex;
        public final boolean considerBoundaryIntersection;
        public final IndexType indexType;
        public final JoinBuildSide joinBuildSide;

        public JoinParams(boolean useIndex, boolean considerBoundaryIntersection) {
            this(useIndex, considerBoundaryIntersection, IndexType.RTREE, JoinBuildSide.RIGHT);
        }

        public JoinParams(boolean useIndex, boolean considerBoundaryIntersection, IndexType polygonIndexType, JoinBuildSide joinBuildSide) {
            this.useIndex = useIndex;
            this.considerBoundaryIntersection = considerBoundaryIntersection;
            this.indexType = polygonIndexType;
            this.joinBuildSide = joinBuildSide;
        }
    }
}

