/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.qjournal.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolPB;
import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.slf4j.Logger;

@InterfaceAudience.Private
@VisibleForTesting
public class JournalNodeRpcServer
implements QJournalProtocol,
InterQJournalProtocol {
    private static final Logger LOG = JournalNode.LOG;
    private static final int HANDLER_COUNT = 5;
    private final JournalNode jn;
    private RPC.Server server;

    JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
        this.jn = jn;
        Configuration confCopy = new Configuration(conf);
        confCopy.setBoolean("ipc.server.tcpnodelay", true);
        InetSocketAddress addr = JournalNodeRpcServer.getAddress(confCopy);
        String bindHost = conf.getTrimmed("dfs.journalnode.rpc-bind-host", null);
        if (bindHost == null) {
            bindHost = addr.getHostName();
        }
        LOG.info("RPC server is binding to " + bindHost + ":" + addr.getPort());
        RPC.setProtocolEngine((Configuration)confCopy, QJournalProtocolPB.class, ProtobufRpcEngine2.class);
        QJournalProtocolServerSideTranslatorPB translator = new QJournalProtocolServerSideTranslatorPB(this);
        BlockingService service = QJournalProtocolProtos.QJournalProtocolService.newReflectiveBlockingService(translator);
        this.server = new RPC.Builder(confCopy).setProtocol(QJournalProtocolPB.class).setInstance((Object)service).setBindAddress(bindHost).setPort(addr.getPort()).setNumHandlers(5).setVerbose(false).build();
        InterQJournalProtocolServerSideTranslatorPB qJournalProtocolServerSideTranslatorPB = new InterQJournalProtocolServerSideTranslatorPB(this);
        BlockingService interQJournalProtocolService = InterQJournalProtocolProtos.InterQJournalProtocolService.newReflectiveBlockingService(qJournalProtocolServerSideTranslatorPB);
        DFSUtil.addPBProtocol(confCopy, InterQJournalProtocolPB.class, interQJournalProtocolService, this.server);
        if (confCopy.getBoolean("hadoop.security.authorization", false)) {
            this.server.refreshServiceAcl(confCopy, (PolicyProvider)new HDFSPolicyProvider());
        }
        this.server.setTracer(jn.tracer);
    }

    void start() {
        this.server.start();
    }

    public InetSocketAddress getAddress() {
        return this.server.getListenerAddress();
    }

    void join() throws InterruptedException {
        this.server.join();
    }

    void stop() {
        this.server.stop();
    }

    static InetSocketAddress getAddress(Configuration conf) {
        String addr = conf.get("dfs.journalnode.rpc-address", "0.0.0.0:8485");
        return NetUtils.createSocketAddr((String)addr, (int)0, (String)"dfs.journalnode.rpc-address");
    }

    @Override
    public boolean isFormatted(String journalId, String nameServiceId) throws IOException {
        return this.jn.getOrCreateJournal(journalId, nameServiceId).isFormatted();
    }

    @Override
    public QJournalProtocolProtos.GetJournalStateResponseProto getJournalState(String journalId, String nameServiceId) throws IOException {
        long epoch = this.jn.getOrCreateJournal(journalId, nameServiceId).getLastPromisedEpoch();
        return QJournalProtocolProtos.GetJournalStateResponseProto.newBuilder().setLastPromisedEpoch(epoch).setHttpPort(this.jn.getBoundHttpAddress().getPort()).setFromURL(this.jn.getHttpServerURI()).build();
    }

    @Override
    public QJournalProtocolProtos.NewEpochResponseProto newEpoch(String journalId, String nameServiceId, NamespaceInfo nsInfo, long epoch) throws IOException {
        return this.jn.getOrCreateJournal(journalId, nameServiceId).newEpoch(nsInfo, epoch);
    }

    @Override
    public void format(String journalId, String nameServiceId, NamespaceInfo nsInfo, boolean force) throws IOException {
        this.jn.getOrCreateJournal(journalId, nameServiceId).format(nsInfo, force);
    }

    @Override
    public void journal(RequestInfo reqInfo, long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException {
        this.jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()).journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
    }

    @Override
    public void heartbeat(RequestInfo reqInfo) throws IOException {
        this.jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()).heartbeat(reqInfo);
    }

    @Override
    public void startLogSegment(RequestInfo reqInfo, long txid, int layoutVersion) throws IOException {
        this.jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()).startLogSegment(reqInfo, txid, layoutVersion);
    }

    @Override
    public void finalizeLogSegment(RequestInfo reqInfo, long startTxId, long endTxId) throws IOException {
        this.jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()).finalizeLogSegment(reqInfo, startTxId, endTxId);
    }

    @Override
    public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep) throws IOException {
        this.jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()).purgeLogsOlderThan(reqInfo, minTxIdToKeep);
    }

    @Override
    public QJournalProtocolProtos.GetEditLogManifestResponseProto getEditLogManifest(String jid, String nameServiceId, long sinceTxId, boolean inProgressOk) throws IOException {
        RemoteEditLogManifest manifest = this.jn.getOrCreateJournal(jid, nameServiceId).getEditLogManifest(sinceTxId, inProgressOk);
        return QJournalProtocolProtos.GetEditLogManifestResponseProto.newBuilder().setManifest(PBHelper.convert(manifest)).setHttpPort(this.jn.getBoundHttpAddress().getPort()).setFromURL(this.jn.getHttpServerURI()).build();
    }

    @Override
    public QJournalProtocolProtos.GetJournaledEditsResponseProto getJournaledEdits(String jid, String nameServiceId, long sinceTxId, int maxTxns) throws IOException {
        return this.jn.getOrCreateJournal(jid, nameServiceId).getJournaledEdits(sinceTxId, maxTxns);
    }

    @Override
    public QJournalProtocolProtos.PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo, long segmentTxId) throws IOException {
        return this.jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()).prepareRecovery(reqInfo, segmentTxId);
    }

    @Override
    public void acceptRecovery(RequestInfo reqInfo, QJournalProtocolProtos.SegmentStateProto log, URL fromUrl) throws IOException {
        this.jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()).acceptRecovery(reqInfo, log, fromUrl);
    }

    @Override
    public void doPreUpgrade(String journalId) throws IOException {
        this.jn.doPreUpgrade(journalId);
    }

    @Override
    public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
        this.jn.doUpgrade(journalId, sInfo);
    }

    @Override
    public void doFinalize(String journalId, String nameServiceId) throws IOException {
        this.jn.doFinalize(journalId, nameServiceId);
    }

    @Override
    public Boolean canRollBack(String journalId, String nameServiceId, StorageInfo storage, StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
        return this.jn.canRollBack(journalId, storage, prevStorage, targetLayoutVersion, nameServiceId);
    }

    @Override
    public void doRollback(String journalId, String nameServiceId) throws IOException {
        this.jn.doRollback(journalId, nameServiceId);
    }

    @Override
    public void discardSegments(String journalId, String nameServiceId, long startTxId) throws IOException {
        this.jn.discardSegments(journalId, startTxId, nameServiceId);
    }

    @Override
    public Long getJournalCTime(String journalId, String nameServiceId) throws IOException {
        return this.jn.getJournalCTime(journalId, nameServiceId);
    }

    @Override
    public QJournalProtocolProtos.GetEditLogManifestResponseProto getEditLogManifestFromJournal(String jid, String nameServiceId, long sinceTxId, boolean inProgressOk) throws IOException {
        RemoteEditLogManifest manifest = this.jn.getOrCreateJournal(jid, nameServiceId).getEditLogManifest(sinceTxId, inProgressOk);
        return QJournalProtocolProtos.GetEditLogManifestResponseProto.newBuilder().setManifest(PBHelper.convert(manifest)).setHttpPort(this.jn.getBoundHttpAddress().getPort()).setFromURL(this.jn.getHttpServerURI()).build();
    }

    @VisibleForTesting
    RPC.Server getRpcServer() {
        return this.server;
    }
}

