/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.transactions.cleanup;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.RequestTracer;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.msg.ResponseStatus;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.client.java.codec.JsonSerializer;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.LookupInOptions;
import com.couchbase.client.java.kv.LookupInResult;
import com.couchbase.client.java.kv.LookupInSpec;
import com.couchbase.client.java.kv.MutateInMacro;
import com.couchbase.client.java.kv.MutateInOptions;
import com.couchbase.client.java.kv.MutateInSpec;
import com.couchbase.client.java.kv.StoreSemantics;
import com.couchbase.transactions.TransactionsReactive;
import com.couchbase.transactions.cleanup.AccessError;
import com.couchbase.transactions.cleanup.ClientRecordDetails;
import com.couchbase.transactions.cleanup.ClusterData;
import com.couchbase.transactions.cleanup.TransactionsCleanup;
import com.couchbase.transactions.components.ActiveTransactionRecord;
import com.couchbase.transactions.components.SerializationUtil;
import com.couchbase.transactions.config.MergedTransactionConfig;
import com.couchbase.transactions.config.TransactionConfig;
import com.couchbase.transactions.error.internal.ErrorClasses;
import com.couchbase.transactions.log.SimpleEventBusLogger;
import com.couchbase.transactions.support.OptionsWrapperUtil;
import com.couchbase.transactions.support.SpanWrapper;
import com.couchbase.transactions.support.SpanWrapperUtil;
import com.couchbase.transactions.util.DebugUtil;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;

@Stability.Internal
public class ClientRecord {
    public static String CLIENT_RECORD_DOC_ID = "_txn:client-record";
    private final ClusterData clusterData;
    private final SimpleEventBusLogger LOGGER;
    private static final String FIELD_HEARTBEAT = "heartbeat_ms";
    private static final String FIELD_EXPIRES = "expires_ms";
    private static final String FIELD_NUM_ATRS = "num_atrs";
    private static final String FIELD_HOST = "host";
    private static final String FIELD_IMPLEMENTATION = "implementation";
    private static final String FIELD_VERSION = "version";
    private static final String FIELD_PROCESS_ID = "process_id";
    public static final String FIELD_RECORDS = "records";
    public static final String FIELD_CLIENTS = "clients";
    public static final String FIELD_OVERRIDE = "override";
    public static final String FIELD_OVERRIDE_ENABLED = "enabled";
    public static final String FIELD_OVERRIDE_EXPIRES = "expires";
    private static int SAFETY_MARGIN_EXPIRY_MSECS = 20000;
    private static final Duration TIMEOUT = Duration.ofMillis(500L);
    private static final Duration BACKOFF_START = Duration.of(10L, ChronoUnit.MILLIS);
    private static final Duration BACKOFF_END = Duration.of(250L, ChronoUnit.MILLIS);

    public ClientRecord(ClusterData clusterData) {
        this.LOGGER = new SimpleEventBusLogger(clusterData.cluster().environment().eventBus(), TransactionsCleanup.CATEGORY_CLIENT_RECORD);
        this.clusterData = clusterData;
    }

    public Flux<Void> removeClientFromAllBuckets(String clientUuid) {
        return this.removeClientFromAllBuckets(clientUuid, TIMEOUT);
    }

    public Flux<Void> removeClientFromAllBuckets(String clientUuid, Duration timeout) {
        return Flux.fromIterable(this.clusterData.bucketNames()).subscribeOn(Schedulers.elastic()).flatMap(bucketName -> this.clusterData.getBucketDefaultCollection((String)bucketName).flatMap(collection -> this.beforeRemoveClient(this).then(collection.mutateIn(CLIENT_RECORD_DOC_ID, Arrays.asList(MutateInSpec.remove((String)("records.clients." + clientUuid)).xattr()), (MutateInOptions)MutateInOptions.mutateInOptions().serializer((JsonSerializer)SerializationUtil.DEFAULT_JSON_SERIALIZER).clientContext(OptionsWrapperUtil.createClientContext("ClientRecord::removeClientFromAllBuckets"))))).onErrorResume(err -> {
            switch (ErrorClasses.classify(err)) {
                case FAIL_DOC_NOT_FOUND: {
                    this.LOGGER.debug(String.format("%s/%s remove skipped as client record does not exist", bucketName, clientUuid));
                    return Mono.empty();
                }
                case FAIL_PATH_NOT_FOUND: {
                    this.LOGGER.debug(String.format("%s/%s remove skipped as client record entry does not exist", bucketName, clientUuid));
                    return Mono.empty();
                }
            }
            this.LOGGER.debug(String.format("%s/%s got error while removing client from client record: %s", bucketName, clientUuid, DebugUtil.dbg(err)));
            return Mono.error((Throwable)err);
        }).retryWhen(Retry.any().exponentialBackoff(BACKOFF_START, BACKOFF_END).doOnRetry(v -> this.LOGGER.info(String.format("%s/%s retrying removing client from record on error %s", bucketName, clientUuid, DebugUtil.dbg(v.exception())))).toReactorRetry()).timeout(timeout).doOnNext(v -> this.LOGGER.info(String.format("%s/%s removed from client record", bucketName, clientUuid))).doOnError(err -> this.LOGGER.info(String.format("got error while removing client record '%s'", err))).then());
    }

    public static ClientRecordDetails parseClientRecord(LookupInResult clientRecord, String clientUuid) {
        JsonObject records = (JsonObject)clientRecord.contentAs(0, JsonObject.class);
        JsonObject hlcRaw = (JsonObject)clientRecord.contentAs(1, JsonObject.class);
        ActiveTransactionRecord.ParsedHLC parsedHLC = new ActiveTransactionRecord.ParsedHLC(hlcRaw);
        JsonObject clients = records.getObject(FIELD_CLIENTS);
        ArrayList<String> expiredClientIds = new ArrayList<String>();
        ArrayList<String> activeClientIds = new ArrayList<String>();
        clients.getNames().forEach(otherClientId -> {
            boolean out;
            int expiresMsecs;
            long heartbeatMsecs;
            JsonObject cl = (JsonObject)clients.get(otherClientId);
            long casMillis = parsedHLC.nowInNanos() / 1000000L;
            long expiredPeriod = casMillis - (heartbeatMsecs = ActiveTransactionRecord.parseMutationCAS(cl.getString(FIELD_HEARTBEAT)));
            boolean hasExpired = expiredPeriod >= (long)(expiresMsecs = cl.getInt(FIELD_EXPIRES).intValue());
            boolean bl = out = hasExpired && !otherClientId.equals(clientUuid);
            if (out) {
                expiredClientIds.add((String)otherClientId);
            } else {
                activeClientIds.add((String)otherClientId);
            }
        });
        if (!activeClientIds.contains(clientUuid)) {
            activeClientIds.add(clientUuid);
        }
        List sortedActiveClientIds = activeClientIds.stream().sorted().collect(Collectors.toList());
        int indexOfThisClient = sortedActiveClientIds.indexOf(clientUuid);
        int numExpiredClients = expiredClientIds.size();
        int numActiveClients = sortedActiveClientIds.size();
        int numExistingClients = numExpiredClients + numActiveClients;
        boolean alreadyContainsClient = clients.containsKey(clientUuid);
        boolean overrideEnabled = false;
        long overrideExpiresCas = 0L;
        JsonObject override = records.getObject(FIELD_OVERRIDE);
        if (override != null) {
            overrideEnabled = override.getBoolean(FIELD_OVERRIDE_ENABLED);
            overrideExpiresCas = override.getLong(FIELD_OVERRIDE_EXPIRES);
        }
        return new ClientRecordDetails(numActiveClients, indexOfThisClient, !alreadyContainsClient, expiredClientIds, numExistingClients, numExpiredClients, overrideEnabled, overrideExpiresCas, parsedHLC.nowInNanos());
    }

    public Mono<ClientRecordDetails> getClientRecord(String bucketName) {
        return this.clusterData.getBucketDefaultCollection(bucketName).flatMap(coll -> ClientRecord.getClientRecord(coll, null)).map(cr -> ClientRecord.parseClientRecord(cr, "not_client"));
    }

    public static Mono<LookupInResult> getClientRecord(ReactiveCollection collection, @Nullable SpanWrapper span) {
        return collection.lookupIn(CLIENT_RECORD_DOC_ID, Arrays.asList(LookupInSpec.get((String)FIELD_RECORDS).xattr(), LookupInSpec.get((String)"$vbucket.HLC").xattr()), (LookupInOptions)((LookupInOptions)LookupInOptions.lookupInOptions().parentSpan(span == null ? null : span.span())).serializer((JsonSerializer)SerializationUtil.DEFAULT_JSON_SERIALIZER).clientContext(OptionsWrapperUtil.createClientContext("ClientRecord::processClient")));
    }

    private RequestTracer tracer() {
        return this.clusterData.cluster().environment().requestTracer();
    }

    public Mono<ClientRecordDetails> processClient(String clientUuid, ReactiveCollection collection, TransactionConfig config) {
        return this.processClient(clientUuid, collection, new MergedTransactionConfig(config, Optional.empty()), null);
    }

    public Mono<ClientRecordDetails> processClient(String clientUuid, ReactiveCollection collection, MergedTransactionConfig config, @Nullable SpanWrapper pspan) {
        SpanWrapper span = SpanWrapperUtil.createOp(null, this.tracer(), collection, CLIENT_RECORD_DOC_ID, "cleanup.client", pspan).attribute("db.couchbase.transactions.cleanup.client_uuid", clientUuid);
        String bp = collection.bucketName() + "/" + collection.name() + "/" + clientUuid;
        return this.beforeGetRecord(this).then(ClientRecord.getClientRecord(collection, span)).flatMap(clientRecord -> {
            ClientRecordDetails cr = ClientRecord.parseClientRecord(clientRecord, clientUuid);
            this.LOGGER.debug(String.format("%s found %d existing clients including this (%s active, %d expired), included this=%s, index of this=%d, override={enabled=%s,expires=%d,now=%d,active=%s}", bp, cr.numExistingClients(), cr.numActiveClients(), cr.numExpiredClients(), !cr.clientIsNew(), cr.indexOfThisClient(), cr.overrideEnabled(), cr.overrideExpires(), cr.casNow(), cr.overrideActive()));
            if (cr.overrideActive()) {
                return Mono.just((Object)cr);
            }
            ArrayList<Object> specs = new ArrayList<Object>();
            String field = "records.clients." + clientUuid + ".";
            String host = "unavailable";
            try {
                host = InetAddress.getLocalHost().getHostAddress();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            long pid = 0L;
            String name = ManagementFactory.getRuntimeMXBean().getName();
            try {
                pid = Long.parseLong(name.split("@")[0]);
            }
            catch (Throwable err) {
                this.LOGGER.debug(String.format("Discarding error %s while trying to parse PID %s", err.getMessage(), name));
            }
            specs.add(MutateInSpec.upsert((String)field, (Object)JsonObject.create().put(FIELD_EXPIRES, config.cleanupWindow().toMillis() + (long)SAFETY_MARGIN_EXPIRY_MSECS).put(FIELD_NUM_ATRS, config.numAtrs()).put(FIELD_IMPLEMENTATION, "java").put(FIELD_VERSION, TransactionsReactive.class.getPackage().getImplementationVersion()).put(FIELD_HOST, host).put(FIELD_PROCESS_ID, pid)).xattr().createPath());
            specs.add(MutateInSpec.upsert((String)(field + FIELD_HEARTBEAT), (Object)MutateInMacro.CAS).createPath());
            cr.expiredClientIds().stream().limit(16 - specs.size() - 1).forEach(expiredClientId -> {
                this.LOGGER.debug(String.format("%s removing expired client %s", bp, expiredClientId));
                specs.add(MutateInSpec.remove((String)("records.clients." + expiredClientId)).xattr());
            });
            specs.add(MutateInSpec.replace((String)"", (Object)new byte[]{0}));
            return this.beforeUpdateRecord(this).then(collection.mutateIn(CLIENT_RECORD_DOC_ID, specs, (MutateInOptions)((MutateInOptions)MutateInOptions.mutateInOptions().serializer((JsonSerializer)SerializationUtil.DEFAULT_JSON_SERIALIZER).parentSpan(span.span())).clientContext(OptionsWrapperUtil.createClientContext("ClientRecord::processClient")))).thenReturn((Object)cr);
        }).onErrorResume(err -> {
            ErrorClasses ec = ErrorClasses.classify(err);
            this.LOGGER.debug(String.format("%s got error processing client record: %s", bp, DebugUtil.dbg(err)));
            if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                return this.createClientRecord(clientUuid, config, collection, span).then(this.processClient(clientUuid, collection, config, pspan));
            }
            if (err instanceof CouchbaseException && ((CouchbaseException)err).context().responseStatus() == ResponseStatus.NO_ACCESS) {
                return Mono.error((Throwable)((Object)new AccessError()));
            }
            return Mono.error((Throwable)err);
        }).doOnNext(v -> span.finish()).doOnError(err -> span.failWith((Throwable)err));
    }

    private Mono<Void> createClientRecord(String clientUuid, MergedTransactionConfig config, ReactiveCollection collection, SpanWrapper pspan) {
        String bp = collection.bucketName() + "/" + collection.name() + "/" + clientUuid;
        return this.beforeCreateRecord(this).then(collection.mutateIn(CLIENT_RECORD_DOC_ID, Arrays.asList(MutateInSpec.insert((String)"records.clients", (Object)JsonObject.create()).xattr(), MutateInSpec.replace((String)"", (Object)new byte[]{0})), OptionsWrapperUtil.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("ClientRecord::createClientRecord"))).storeSemantics(StoreSemantics.INSERT), pspan, config, collection.core()))).doOnSubscribe(v -> this.LOGGER.debug(String.format("%s found client record does not exist, creating and retrying", bp))).onErrorResume(e -> {
            if (ErrorClasses.FAIL_DOC_ALREADY_EXISTS == ErrorClasses.classify(e)) {
                this.LOGGER.debug(String.format("%s found client record exists after retry, another client must have created it, continuing", bp));
                return Mono.empty();
            }
            if (e instanceof CouchbaseException && ((CouchbaseException)e).context().responseStatus() == ResponseStatus.NO_ACCESS) {
                return Mono.error((Throwable)((Object)new AccessError()));
            }
            this.LOGGER.info(String.format("got error while creating client record '%s'", e));
            return Mono.error((Throwable)e);
        }).then();
    }

    protected Mono<Integer> beforeCreateRecord(ClientRecord self) {
        return Mono.just((Object)1);
    }

    protected Mono<Integer> beforeRemoveClient(ClientRecord self) {
        return Mono.just((Object)1);
    }

    @Deprecated
    protected Mono<Integer> beforeUpdateCAS(ClientRecord self) {
        return Mono.just((Object)1);
    }

    protected Mono<Integer> beforeGetRecord(ClientRecord self) {
        return Mono.just((Object)1);
    }

    protected Mono<Integer> beforeUpdateRecord(ClientRecord self) {
        return Mono.just((Object)1);
    }
}

