/*
 * Decompiled with CFR 0.152.
 */
package ru.yoomoney.tech.dbqueue.spring.dao;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.springframework.dao.support.DataAccessUtils;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import ru.yoomoney.tech.dbqueue.api.TaskRecord;
import ru.yoomoney.tech.dbqueue.config.QueueTableSchema;
import ru.yoomoney.tech.dbqueue.dao.QueuePickTaskDao;
import ru.yoomoney.tech.dbqueue.settings.FailRetryType;
import ru.yoomoney.tech.dbqueue.settings.FailureSettings;
import ru.yoomoney.tech.dbqueue.settings.QueueLocation;

public class H2QueuePickTaskDao
implements QueuePickTaskDao {
    private final RowIdLocker rowIdLocker = new RowIdLocker();
    @Nonnull
    private String pickTaskSql;
    private final NamedParameterJdbcTemplate jdbcTemplate;
    private final QueueTableSchema queueTableSchema;
    @Nonnull
    private final QueueLocation queueLocation;
    private final FailureSettings failureSettings;

    public H2QueuePickTaskDao(@Nonnull JdbcOperations jdbcOperations, @Nonnull QueueTableSchema queueTableSchema, @Nonnull QueueLocation queueLocation, @Nonnull FailureSettings failureSettings) {
        this.jdbcTemplate = new NamedParameterJdbcTemplate(Objects.requireNonNull(jdbcOperations));
        this.queueTableSchema = Objects.requireNonNull(queueTableSchema);
        this.queueLocation = Objects.requireNonNull(queueLocation);
        this.failureSettings = Objects.requireNonNull(failureSettings);
        this.pickTaskSql = H2QueuePickTaskDao.createPickTaskSql(queueLocation, failureSettings, queueTableSchema);
        failureSettings.registerObserver((oldValue, newValue) -> {
            this.pickTaskSql = H2QueuePickTaskDao.createPickTaskSql(queueLocation, newValue, queueTableSchema);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    public TaskRecord pickTask() {
        String queueId = this.queueLocation.getQueueId().asString();
        Long taskId = this.rowIdLocker.lock(queueId, rowIds -> {
            List ids = this.jdbcTemplate.queryForList(H2QueuePickTaskDao.getSelectSql(this.queueLocation, this.queueTableSchema), (SqlParameterSource)new MapSqlParameterSource().addValue("queueId", (Object)queueId).addValue("rowIds", rowIds), Long.class);
            return (Long)DataAccessUtils.singleResult((Collection)ids);
        });
        if (taskId == null) {
            return null;
        }
        try {
            int updatedRowCount = this.jdbcTemplate.update(this.pickTaskSql, (SqlParameterSource)new MapSqlParameterSource().addValue("retryInterval", (Object)this.failureSettings.getRetryInterval().getSeconds()).addValue("taskId", (Object)taskId));
            if (updatedRowCount != 1) {
                throw new IllegalStateException("Something wrong went here. Only row must be updated, not more!");
            }
            TaskRecord taskRecord = (TaskRecord)this.jdbcTemplate.query(H2QueuePickTaskDao.getReturnSql(this.queueLocation, this.queueTableSchema), (SqlParameterSource)new MapSqlParameterSource("taskId", (Object)taskId), rs -> {
                if (!rs.next()) {
                    return null;
                }
                Map additionalData = this.queueTableSchema.getExtFields().stream().collect(LinkedHashMap::new, (map, key) -> {
                    try {
                        map.put(key, rs.getString((String)key));
                    }
                    catch (SQLException e) {
                        throw new RuntimeException(e);
                    }
                }, Map::putAll);
                return TaskRecord.builder().withId(rs.getLong(this.queueTableSchema.getIdField())).withCreatedAt(H2QueuePickTaskDao.getZonedDateTime(rs, this.queueTableSchema.getCreatedAtField())).withNextProcessAt(H2QueuePickTaskDao.getZonedDateTime(rs, this.queueTableSchema.getNextProcessAtField())).withPayload(rs.getString(this.queueTableSchema.getPayloadField())).withAttemptsCount(rs.getLong(this.queueTableSchema.getAttemptField())).withReenqueueAttemptsCount(rs.getLong(this.queueTableSchema.getReenqueueAttemptField())).withTotalAttemptsCount(rs.getLong(this.queueTableSchema.getTotalAttemptField())).withExtData(additionalData).build();
            });
            return taskRecord;
        }
        finally {
            this.rowIdLocker.unlock(queueId, taskId);
        }
    }

    private static String getSelectSql(QueueLocation location, QueueTableSchema queueTableSchema) {
        return String.format("SELECT %s FROM %s WHERE %s = :queueId   AND %s <= now()   AND _ROWID_ NOT IN (:rowIds) ORDER BY %s ASC LIMIT 1 ", queueTableSchema.getIdField(), location.getTableName(), queueTableSchema.getQueueNameField(), queueTableSchema.getNextProcessAtField(), queueTableSchema.getNextProcessAtField());
    }

    private static String createPickTaskSql(QueueLocation location, FailureSettings failureSettings, QueueTableSchema queueTableSchema) {
        return String.format("UPDATE %s SET   %s = %s,   %s = %s + 1,   %s = %s + 1 WHERE %s = :taskId ", location.getTableName(), queueTableSchema.getNextProcessAtField(), H2QueuePickTaskDao.getNextProcessTimeSql(failureSettings.getRetryType(), queueTableSchema), queueTableSchema.getAttemptField(), queueTableSchema.getAttemptField(), queueTableSchema.getTotalAttemptField(), queueTableSchema.getTotalAttemptField(), queueTableSchema.getIdField());
    }

    private static String getReturnSql(QueueLocation location, QueueTableSchema queueTableSchema) {
        return String.format("SELECT     %s,    %s,    %s,    %s,    %s,    %s,    %s     %s  FROM %s WHERE %s = :taskId", queueTableSchema.getIdField(), queueTableSchema.getPayloadField(), queueTableSchema.getAttemptField(), queueTableSchema.getReenqueueAttemptField(), queueTableSchema.getTotalAttemptField(), queueTableSchema.getCreatedAtField(), queueTableSchema.getNextProcessAtField(), queueTableSchema.getExtFields().isEmpty() ? "" : queueTableSchema.getExtFields().stream().collect(Collectors.joining(", ", ", ", "")), location.getTableName(), queueTableSchema.getIdField());
    }

    private static ZonedDateTime getZonedDateTime(ResultSet rs, String time) throws SQLException {
        return ZonedDateTime.ofInstant(rs.getTimestamp(time).toInstant(), ZoneId.systemDefault());
    }

    private static String getNextProcessTimeSql(@Nonnull FailRetryType failRetryType, @Nonnull QueueTableSchema queueTableSchema) {
        Objects.requireNonNull(failRetryType, "retry type must be not null");
        Objects.requireNonNull(queueTableSchema, "queue table schema must be not null");
        switch (failRetryType) {
            case GEOMETRIC_BACKOFF: {
                return String.format("TIMESTAMPADD(SECOND, POWER(2, %s) * :retryInterval , NOW())", queueTableSchema.getAttemptField());
            }
            case ARITHMETIC_BACKOFF: {
                return String.format("TIMESTAMPADD(SECOND, (1 + %s * 2) * :retryInterval, NOW())", queueTableSchema.getAttemptField());
            }
            case LINEAR_BACKOFF: {
                return "TIMESTAMPADD(SECOND, :retryInterval, NOW())";
            }
        }
        throw new IllegalStateException("unknown retry type: " + failRetryType);
    }

    private static class RowIdLocker {
        private final Map<String, Set<Long>> lockedRowIds = new ConcurrentHashMap<String, Set<Long>>();

        private RowIdLocker() {
        }

        public Long lock(String queueName, Function<Set<Long>, Long> taskIdExtractor) {
            AtomicReference atomicReference = new AtomicReference();
            this.lockedRowIds.compute(queueName, (key, rowIds) -> {
                Set idSet = rowIds == null ? new HashSet() : rowIds;
                Long taskId = (Long)taskIdExtractor.apply(idSet);
                if (taskId == null) {
                    return idSet;
                }
                atomicReference.set(taskId);
                idSet.add(taskId);
                return idSet;
            });
            return (Long)atomicReference.get();
        }

        public void unlock(String queueName, Long taskId) {
            this.lockedRowIds.computeIfPresent(queueName, (key, rowIds) -> {
                rowIds.remove(taskId);
                return rowIds;
            });
        }
    }
}

