/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.catchup;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.stream.ChunkedWriteHandler;
import java.net.BindException;
import java.net.SocketAddress;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import org.neo4j.causalclustering.VersionDecoder;
import org.neo4j.causalclustering.VersionPrepender;
import org.neo4j.causalclustering.catchup.CatchupServerProtocol;
import org.neo4j.causalclustering.catchup.RequestDecoderDispatcher;
import org.neo4j.causalclustering.catchup.RequestMessageTypeEncoder;
import org.neo4j.causalclustering.catchup.ResponseMessageTypeEncoder;
import org.neo4j.causalclustering.catchup.ServerMessageTypeHandler;
import org.neo4j.causalclustering.catchup.SimpleRequestDecoder;
import org.neo4j.causalclustering.catchup.storecopy.FileChunkEncoder;
import org.neo4j.causalclustering.catchup.storecopy.FileHeaderEncoder;
import org.neo4j.causalclustering.catchup.storecopy.GetStoreIdRequest;
import org.neo4j.causalclustering.catchup.storecopy.GetStoreIdRequestHandler;
import org.neo4j.causalclustering.catchup.storecopy.GetStoreIdResponseEncoder;
import org.neo4j.causalclustering.catchup.storecopy.GetStoreRequestDecoder;
import org.neo4j.causalclustering.catchup.storecopy.GetStoreRequestHandler;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFinishedResponseEncoder;
import org.neo4j.causalclustering.catchup.tx.TxPullRequestDecoder;
import org.neo4j.causalclustering.catchup.tx.TxPullRequestHandler;
import org.neo4j.causalclustering.catchup.tx.TxPullResponseEncoder;
import org.neo4j.causalclustering.catchup.tx.TxStreamFinishedResponseEncoder;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.state.CoreSnapshotService;
import org.neo4j.causalclustering.core.state.snapshot.CoreSnapshotEncoder;
import org.neo4j.causalclustering.core.state.snapshot.CoreSnapshotRequest;
import org.neo4j.causalclustering.core.state.snapshot.CoreSnapshotRequestHandler;
import org.neo4j.causalclustering.handlers.ExceptionLoggingHandler;
import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler;
import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.Message;
import org.neo4j.function.Factory;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.ssl.SslPolicy;

public class CatchupServer
extends LifecycleAdapter {
    private final LogProvider logProvider;
    private final Log log;
    private final Log userLog;
    private final Monitors monitors;
    private final Supplier<StoreId> storeIdSupplier;
    private final Supplier<TransactionIdStore> transactionIdStoreSupplier;
    private final Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier;
    private final Supplier<NeoStoreDataSource> dataSourceSupplier;
    private final BooleanSupplier dataSourceAvailabilitySupplier;
    private final FileSystemAbstraction fs;
    private final PageCache pageCache;
    private final SslPolicy sslPolicy;
    private final StoreCopyCheckPointMutex storeCopyCheckPointMutex;
    private final NamedThreadFactory threadFactory = new NamedThreadFactory("catchup-server");
    private final CoreSnapshotService snapshotService;
    private final ListenSocketAddress listenAddress;
    private EventLoopGroup workerGroup;
    private Channel channel;
    private final Supplier<CheckPointer> checkPointerSupplier;

    public CatchupServer(LogProvider logProvider, LogProvider userLogProvider, Supplier<StoreId> storeIdSupplier, Supplier<TransactionIdStore> transactionIdStoreSupplier, Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier, Supplier<NeoStoreDataSource> dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier, CoreSnapshotService snapshotService, Config config, Monitors monitors, Supplier<CheckPointer> checkPointerSupplier, FileSystemAbstraction fs, PageCache pageCache, StoreCopyCheckPointMutex storeCopyCheckPointMutex, SslPolicy sslPolicy) {
        this.snapshotService = snapshotService;
        this.storeCopyCheckPointMutex = storeCopyCheckPointMutex;
        this.listenAddress = (ListenSocketAddress)config.get(CausalClusteringSettings.transaction_listen_address);
        this.transactionIdStoreSupplier = transactionIdStoreSupplier;
        this.storeIdSupplier = storeIdSupplier;
        this.dataSourceAvailabilitySupplier = dataSourceAvailabilitySupplier;
        this.logicalTransactionStoreSupplier = logicalTransactionStoreSupplier;
        this.logProvider = logProvider;
        this.monitors = monitors;
        this.log = logProvider.getLog(((Object)((Object)this)).getClass());
        this.userLog = userLogProvider.getLog(((Object)((Object)this)).getClass());
        this.dataSourceSupplier = dataSourceSupplier;
        this.checkPointerSupplier = checkPointerSupplier;
        this.fs = fs;
        this.pageCache = pageCache;
        this.sslPolicy = sslPolicy;
    }

    public synchronized void start() throws Throwable {
        block3: {
            if (this.channel != null) {
                return;
            }
            this.workerGroup = new NioEventLoopGroup(0, (ThreadFactory)this.threadFactory);
            ServerBootstrap bootstrap = ((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().group(this.workerGroup).channel(NioServerSocketChannel.class)).localAddress((SocketAddress)this.listenAddress.socketAddress())).childHandler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

                protected void initChannel(SocketChannel ch) throws Exception {
                    CatchupServerProtocol protocol = new CatchupServerProtocol();
                    ChannelPipeline pipeline = ch.pipeline();
                    if (CatchupServer.this.sslPolicy != null) {
                        pipeline.addLast(new ChannelHandler[]{CatchupServer.this.sslPolicy.nettyServerHandler((Channel)ch)});
                    }
                    pipeline.addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)});
                    pipeline.addLast(new ChannelHandler[]{new LengthFieldPrepender(4)});
                    pipeline.addLast(new ChannelHandler[]{new VersionDecoder(CatchupServer.this.logProvider)});
                    pipeline.addLast(new ChannelHandler[]{new VersionPrepender()});
                    pipeline.addLast(new ChannelHandler[]{new ResponseMessageTypeEncoder()});
                    pipeline.addLast(new ChannelHandler[]{new RequestMessageTypeEncoder()});
                    pipeline.addLast(new ChannelHandler[]{new TxPullResponseEncoder()});
                    pipeline.addLast(new ChannelHandler[]{new CoreSnapshotEncoder()});
                    pipeline.addLast(new ChannelHandler[]{new GetStoreIdResponseEncoder()});
                    pipeline.addLast(new ChannelHandler[]{new StoreCopyFinishedResponseEncoder()});
                    pipeline.addLast(new ChannelHandler[]{new TxStreamFinishedResponseEncoder()});
                    pipeline.addLast(new ChannelHandler[]{new FileChunkEncoder()});
                    pipeline.addLast(new ChannelHandler[]{new FileHeaderEncoder()});
                    pipeline.addLast(new ChannelHandler[]{new ServerMessageTypeHandler(protocol, CatchupServer.this.logProvider)});
                    pipeline.addLast(new ChannelHandler[]{CatchupServer.this.decoders(protocol)});
                    pipeline.addLast(new ChannelHandler[]{new ChunkedWriteHandler()});
                    pipeline.addLast(new ChannelHandler[]{new TxPullRequestHandler(protocol, CatchupServer.this.storeIdSupplier, CatchupServer.this.dataSourceAvailabilitySupplier, CatchupServer.this.transactionIdStoreSupplier, CatchupServer.this.logicalTransactionStoreSupplier, CatchupServer.this.monitors, CatchupServer.this.logProvider)});
                    pipeline.addLast(new ChannelHandler[]{new GetStoreRequestHandler(protocol, CatchupServer.this.dataSourceSupplier, CatchupServer.this.checkPointerSupplier, CatchupServer.this.fs, CatchupServer.this.pageCache, CatchupServer.this.logProvider, CatchupServer.this.storeCopyCheckPointMutex)});
                    pipeline.addLast(new ChannelHandler[]{new GetStoreIdRequestHandler(protocol, CatchupServer.this.storeIdSupplier)});
                    if (CatchupServer.this.snapshotService != null) {
                        pipeline.addLast(new ChannelHandler[]{new CoreSnapshotRequestHandler(protocol, CatchupServer.this.snapshotService)});
                    }
                    pipeline.addLast(new ChannelHandler[]{new ExceptionLoggingHandler(CatchupServer.this.log)});
                    pipeline.addLast(new ChannelHandler[]{new ExceptionMonitoringHandler((ExceptionMonitoringHandler.Monitor)CatchupServer.this.monitors.newMonitor(ExceptionMonitoringHandler.Monitor.class, CatchupServer.class, new String[0]))});
                    pipeline.addLast(new ChannelHandler[]{new ExceptionSwallowingHandler()});
                }
            });
            try {
                this.channel = bootstrap.bind().syncUninterruptibly().channel();
            }
            catch (Exception e) {
                if (!(e instanceof BindException)) break block3;
                this.userLog.error("Address is already bound for setting: " + CausalClusteringSettings.transaction_listen_address + " with value: " + this.listenAddress);
                this.log.error("Address is already bound for setting: " + CausalClusteringSettings.transaction_listen_address + " with value: " + this.listenAddress, (Throwable)e);
                throw e;
            }
        }
    }

    private ChannelInboundHandler decoders(CatchupServerProtocol protocol) {
        RequestDecoderDispatcher<CatchupServerProtocol.State> decoderDispatcher = new RequestDecoderDispatcher<CatchupServerProtocol.State>(protocol, this.logProvider);
        decoderDispatcher.register(CatchupServerProtocol.State.TX_PULL, (ChannelInboundHandler)new TxPullRequestDecoder());
        decoderDispatcher.register(CatchupServerProtocol.State.GET_STORE, (ChannelInboundHandler)new GetStoreRequestDecoder());
        decoderDispatcher.register(CatchupServerProtocol.State.GET_STORE_ID, (ChannelInboundHandler)new SimpleRequestDecoder((Factory<? extends Message>)((Factory)GetStoreIdRequest::new)));
        decoderDispatcher.register(CatchupServerProtocol.State.GET_CORE_SNAPSHOT, (ChannelInboundHandler)new SimpleRequestDecoder((Factory<? extends Message>)((Factory)CoreSnapshotRequest::new)));
        return decoderDispatcher;
    }

    public synchronized void stop() throws Throwable {
        if (this.channel == null) {
            return;
        }
        this.log.info("CatchupServer stopping and unbinding from " + this.listenAddress);
        try {
            this.channel.close().sync();
            this.channel = null;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.log.warn("Interrupted while closing channel.");
        }
        if (this.workerGroup != null && this.workerGroup.shutdownGracefully(2L, 5L, TimeUnit.SECONDS).awaitUninterruptibly(10L, TimeUnit.SECONDS)) {
            this.log.warn("Worker group not shutdown within 10 seconds.");
        }
        this.workerGroup = null;
    }
}

