侧边栏壁纸
  • 累计撰写 35 篇文章
  • 累计收到 12 条评论

Raft算法改进

zhenxi
2023-10-08 / 0 评论 / 115 阅读 / 搜一下 / 正在检测是否收录...

1.1 lu-raft介绍

项目介绍:一个Java 版本的 Raft(CP) KV 分布式存储实现。可用于 Raft 初学者深入学习 Raft 协议。项目完全是参照 RAFT 论文实现。

github地址:https://github.com/stateIs0/lu-raft-kv

选择原因:该项目使用 Java语言实现,并且RPC 网络通信框架使用的是蚂蚁金服 SOFA-Bolt,底层 KV 存储使用的是 RocksDB,其中核心的 Raft 则由我们自己实现。由于SOFAJraft中的代码太多了,不知道如何去入手,于是先使用简单的项目进行更改实现并做对比实验。

1.2 技术选型

实现目标:基于 Raft 论文实现 Raft 核心功能,即 Leader 选举 & 日志复制。

Raft 核心组件包括:一致性模块,RPC 通信,日志模块,状态机。

  • 一致性模块,是 Raft 算法的核心实现,通过一致性模块,保证 Raft 集群节点数据的一致性。这里我们需要自己根据论文描述去实现
  • RPC 通信,可以使用 HTTP 短连接,也可以直接使用 TCP 长连接,考虑到集群各个节点频繁通信,同时节点通常都在一个局域网内,因此我们选用 TCP 长连接。而 Java 社区长连接框架首选 Netty,这里我们选用蚂蚁金服网络通信框架 SOFA-Bolt(基于 Netty),便于快速开发。
  • 日志模块,Raft 算法中,日志实现是基础,考虑到时间因素,我们选用 RocksDB 作为日志存储。
  • 状态机,可以是任何实现,其实质就是将日志中的内容进行处理。可以理解为 Mysql binlog 中的具体数据。由于我们是要实现一个 KV 存储,那么可以直接使用日志模块的 RocksDB 组件。

1.3 项目接口设计

  • Consensus, 一致性模块接口
public interface Consensus {

    /**
     * 请求投票 RPC
     *
     * 接收者实现:
     *
     *      如果term < currentTerm返回 false (5.2 节)
     *      如果 votedFor 为空或者就是 candidateId,并且候选人的日志至少和自己一样新,那么就投票给他(5.2 节,5.4 节)
     * @return
     */
    RvoteResult requestVote(RvoteParam param);

    /**
     * 附加日志(多个日志,为了提高效率) RPC
     *
     * 接收者实现:
     *
     *    如果 term < currentTerm 就返回 false (5.1 节)
     *    如果日志在 prevLogIndex 位置处的日志条目的任期号和 prevLogTerm 不匹配,则返回 false (5.3 节)
     *    如果已经存在的日志条目和新的产生冲突(索引值相同但是任期号不同),删除这一条和之后所有的 (5.3 节)
     *    附加任何在已有的日志中不存在的条目
     *    如果 leaderCommit > commitIndex,令 commitIndex 等于 leaderCommit 和 新日志条目索引值中较小的一个
     * @return
     */
    AentryResult appendEntries(AentryParam param);
}

请求投票 & 附加日志。也就是 Raft 节点的核心功能,leader 选举和 日志复制。实现这两个接口是 Raft 的关键所在。

  • LogModule,日志模块接口
public interface LogModule {

    void write(LogEntry logEntry);

    LogEntry read(Long index);

    void removeOnStartIndex(Long startIndex);

    LogEntry getLast();

    Long getLastIndex();
}

分别是写,读,删,最后是两个关于 Last 的接口,在 Raft 中,Last 是一个非常关键的东西。

  • StateMachine, 状态机接口
public interface StateMachine {

    /**
     * 将数据应用到状态机.
     *
     * 原则上,只需这一个方法(apply). 其他的方法是为了更方便的使用状态机.
     * @param logEntry 日志中的数据.
     */
    void apply(LogEntry logEntry);

    LogEntry get(String key);

    String getString(String key);

    void setString(String key, String value);

    void delString(String... key);

}
  • RpcServer & RpcClient, RPC 接口
public interface RpcServer {

    void start();

    void stop();

    Response handlerRequest(Request request);

}
public interface RpcClient {

    Response send(Request request);

    Response send(Request request, int timeout);
}
  • Node,同时,为了聚合上面的几个接口,我们需要定义一个 Node 接口,即节点,Raft 抽象的机器节点。
public interface Node<T> extends LifeCycle{

    /**
     * 设置配置文件.
     *
     * @param config
     */
    void setConfig(NodeConfig config);

    /**
     * 处理请求投票 RPC.
     *
     * @param param
     * @return
     */
    RvoteResult handlerRequestVote(RvoteParam param);

    /**
     * 处理附加日志请求.
     *
     * @param param
     * @return
     */
    AentryResult handlerAppendEntries(AentryParam param);

    /**
     * 处理客户端请求.
     *
     * @param request
     * @return
     */
    ClientKVAck handlerClientRequest(ClientKVReq request);

    /**
     * 转发给 leader 节点.
     * @param request
     * @return
     */
    ClientKVAck redirect(ClientKVReq request);
}

首先,一个 Node 肯定需要配置文件,所以有一个 setConfig 接口,然后,肯定需要处理“请求投票”和“附加日志”,同时,还需要接收用户,也就是客户端的请求(不然数据从哪来?),所以有 handlerClientRequest 接口,最后,考虑到灵活性,我们让每个节点都可以接收客户端的请求,但 follower 节点并不能处理请求,所以需要重定向到 leader 节点,因此,我们需要一个重定向接口。

  • LifeCycle, 最后,我们需要管理以上组件的生命周期,因此需要一个 LifeCycle 接口。
public interface LifeCycle {

    void init() throws Throwable;

    void destroy() throws Throwable;
}

1.4 接口实现

Leader选举实现:

  1. 选举者必须不是 leader。
  2. 必须超时了才能选举,具体超时时间根据你的设计而定,注意,每个节点的超时时间不能相同,应当使用随机算法错开(Raft 关键实现),避免无谓的死锁。
  3. 选举者优先选举自己,将自己变成 candidate。
  4. 选举的第一步就是把自己的 term 加一。
  5. 然后像其他节点发送请求投票 RPC,请求参数参照论文,包括自身的 term,自身的 lastIndex,以及日志的 lastTerm。同时,请求投票 RPC 应该是并行请求的。
  6. 等待投票结果应该有超时控制,如果超时了,就不等待了。
  7. 最后,如果有超过半数的响应为 success,那么就需要立即变成 leader ,并发送心跳阻止其他选举。
  8. 如果失败了,就需要重新选举。注意,这个期间,如果有其他节点发送心跳,也需要立刻变成 follower,否则,将死循环。
/**
     * 1. 在转变成候选人后就立即开始选举过程
     *      自增当前的任期号(currentTerm)
     *      给自己投票
     *      重置选举超时计时器
     *      发送请求投票的 RPC 给其他所有服务器
     * 2. 如果接收到大多数服务器的选票,那么就变成领导人
     * 3. 如果接收到来自新的领导人的附加日志 RPC,转变成跟随者
     * 4. 如果选举过程超时,再次发起一轮选举
     */
class ElectionTask implements Runnable {

    @Override
    public void run() {
        // 如果已经是 leader 了, 就不需要再进行选举了.
        if (status == LEADER) {
            return;
        }
        // 获取当前时间
        long current = System.currentTimeMillis();
        // 基于 RAFT 的随机时间,解决冲突.
        // 随机时间,避免集群内多台机器同时发起选举.
        electionTime = electionTime + ThreadLocalRandom.current().nextInt(50);
        // 如果当前时间 - 上一次选举时间 < 选举时间间隔, 就不需要再进行选举了.
        if (current - preElectionTime < electionTime) {
            return;
        }
        // 如果大于选举时间间隔,则设置状态为候选人
        status = NodeStatus.CANDIDATE;
        // 输出日志,日志信息为当前节点从跟随者变成候选人
        LOGGER.error("node {} will become CANDIDATE and start election leader, current term : [{}], LastEntry : [{}]",
                     peerSet.getSelf(), currentTerm, logModule.getLast());
        // 设置上一次选举时间,计算方法为: 当前时间 + 150 + 200 之间的随机数
        preElectionTime = System.currentTimeMillis() + ThreadLocalRandom.current().nextInt(200) + 150;
        // 任期 + 1
        currentTerm = currentTerm + 1;
        // 推荐自己.
        votedFor = peerSet.getSelf().getAddr();
        // 重置选举计时器
        List<Peer> peers = peerSet.getPeersWithOutSelf();
        // 选举投票
        ArrayList<Future> futureArrayList = new ArrayList<>();
        // 打印日志,日志信息为当前节点开始给其他节点发送投票请求
        LOGGER.info("peerList size : {}, peer list content : {}", peers.size(), peers);

        // 发送请求
        for (Peer peer : peers) {
            // 异步请求投票
            futureArrayList.add(RaftThreadPool.submit(new Callable() {
                // 异步请求投票
                @Override
                public Object call() throws Exception {
                    // 构建请求投票参数
                    long lastTerm = 0L;
                    // 获取最后一条日志
                    LogEntry last = logModule.getLast();
                    // 如果最后一条日志不为空,则获取最后一条日志的任期号
                    if (last != null) {
                        lastTerm = last.getTerm();
                    }
                    // 构建请求投票参数,主要有四个参数,分别为:候选人的任期号,候选人ID,候选人最后一条日志的索引值,候选人最后一条日志的任期号
                    RvoteParam param = RvoteParam.newBuilder().
                        term(currentTerm).
                        candidateId(peerSet.getSelf().getAddr()).
                        lastLogIndex(LongConvert.convert(logModule.getLastIndex())).
                        lastLogTerm(lastTerm).
                        build();
                    // 构建请求对象,主要有三个参数,分别为:请求的命令,请求的参数,请求的地址
                    Request request = Request.newBuilder()
                        .cmd(Request.R_VOTE)
                        .obj(param)
                        .url(peer.getAddr())
                        .build();
                    // 发送请求
                    try {
                        //调用rpcClient的send方法发送请求
                        @SuppressWarnings("unchecked")
                        Response<RvoteResult> response = getRpcClient().send(request);
                        return response;

                    } catch (RaftRemotingException e) {
                        // 如果发送失败,则打印日志,日志信息为请求投票失败
                        LOGGER.error("ElectionTask RPC Fail , URL : " + request.getUrl());
                        return null;
                    }
                }
            }));
        }

需要更改的地方:

2.可以根据信誉机制,将超时时间进行缩短,

electionTime = electionTime + ThreadLocalRandom.current().nextInt(50);

这句代码就是创建随机时间,通过信誉机制加权设计出随即时间,伪代码如下:

//信誉值
private Double reputation = Double.valueOf(0);
//存活时间
private Double survivalRate = Double.valueOf(0);
//Rpc时间
private Date RpcTime = Null;
//记录成功共识次数
private int successConsensus = 0;
//计算信誉值,信誉值等于存活时间*0.3+RPC时间*0.2+成功共识次数*0.5
public double calculateReputation(double survivalRate, Date RpcTime, int successConsensus){
    reputation = survivalRate*0.3+RpcTime*0.2+successConsensus*0.5;
    return reputation;
}
//计算存活率
private Instant startTime;
private Instant endTime;

// 节点启动时调用此方法,记录起始时间
public void startNode() {
    startTime = Instant.now();
}

// 节点关闭时调用此方法,记录结束时间
public void stopNode() {
    endTime = Instant.now();
}

// 计算节点存活时间,并返回存活率
public double calculateNodeSurvivalRate(long totalTime) {
    Duration nodeAliveDuration = Duration.between(startTime, endTime);
    double nodeAliveTime = nodeAliveDuration.toMillis();
    double survivalRate = (nodeAliveTime / totalTime) * 100;
    return survivalRate;
}

//获取总时间
public long getTotalTime() {
    long systemStartTime = System.currentTimeMillis(); // 获取系统运行时间的起始时间
    long totalTime = System.currentTimeMillis() - systemStartTime; // 计算系统运行时间
    return totalTime;
}

接着让随机时间设置为信誉值加权!

electionTime = electionTime + (ThreadLocalRandom.current().nextInt(50))*reputation;

这样的话,可以通过信誉值加权的方式使得信誉值较高的节点更有利获得候选人的机会。

5.也需要进行更改,需要把节点的信誉值也发过去。

7.这里没有考虑到平票的情况,如果发生平票问题比对信誉值!通过信誉值来直接确定谁是领导者!或者通过添加某个机器学习算法,比如通过决策树算法来通过上述计算信誉值加权的信息来进行预测分析。从而选出领导者。但是这需要有大量的数据。这个数据的来源肯定是我自己创作出来的,那如何保证这个数据的权威性呢???

接受者投票:

接收者在收到“请求投票” RPC 后,需要做以下事情:

  1. 注意,选举操作应该是串行的,因为涉及到状态修改,并发操作将导致数据错乱。也就是说,如果抢锁失败,应当立即返回错误。
  2. 首先判断对方的 term 是否小于自己,如果小于自己,直接返回失败。
  3. 如果当前节点没有投票给任何人,或者投的正好是对方,那么就可以比较日志的大小,反之,返回失败。
  4. 如果对方日志没有自己大,返回失败。反之,投票给对方,并变成 follower。变成 follower 的同时,异步的选举任务在最后从 condidate 变成 leader 之前,会判断是否是 follower,如果是 follower,就放弃成为 leader。这是一个兜底的措施。
/**
     * 请求投票 RPC
     *
     * 接收者实现:
     *      如果term < currentTerm返回 false (5.2 节)
     *      如果 votedFor 为空或者就是 candidateId,并且候选人的日志至少和自己一样新,那么就投票给他(5.2 节,5.4 节)
     */
@Override
public RvoteResult requestVote(RvoteParam param) {
    try {
        // 重置选举时间
        RvoteResult.Builder builder = RvoteResult.newBuilder();
        if (!voteLock.tryLock()) {
            return builder.term(node.getCurrentTerm()).voteGranted(false).build();
        }

        // 如果对方任期没有自己新,拒绝投票
        if (param.getTerm() < node.getCurrentTerm()) {
            return builder.term(node.getCurrentTerm()).voteGranted(false).build();
        }

        // (当前节点并没有投票 或者 已经投票过了且是对方节点) && 对方日志和自己一样新,才能投票,打印日志,日志信息包括对方的任期号和自己的任期号
        LOGGER.info("node {} current vote for [{}], param candidateId : {}", node.peerSet.getSelf(), node.getVotedFor(), param.getCandidateId());
        LOGGER.info("node {} current term {}, peer term : {}", node.peerSet.getSelf(), node.getCurrentTerm(), param.getTerm());

        // 如果对方任期没有自己新,拒绝投票,返回自己的任期号
        if ((StringUtil.isNullOrEmpty(node.getVotedFor()) || node.getVotedFor().equals(param.getCandidateId()))) {
            //判断日志是否比自己新
            if (node.getLogModule().getLast() != null) {
                // 对方没有自己新
                if (node.getLogModule().getLast().getTerm() > param.getLastLogTerm()) {
                    return RvoteResult.fail();
                }
                // 对方没有自己新
                if (node.getLogModule().getLastIndex() > param.getLastLogIndex()) {
                    return RvoteResult.fail();
                }
            }

            // 切换状态
            node.status = NodeStatus.FOLLOWER;
            // 更新
            node.peerSet.setLeader(new Peer(param.getCandidateId()));
            node.setCurrentTerm(param.getTerm());
            node.setVotedFor(param.serverId);
            // 返回成功
            return builder.term(node.currentTerm).voteGranted(true).build();
        }

        return builder.term(node.currentTerm).voteGranted(false).build();

    } finally {
        voteLock.unlock();
    }
}

需要更改的地方:

可根据投票节点的信誉值设置加权!!!如果其信誉值越高则其投票加权越高。那这个代码需要改进的就很多了,作者应该是每一个节点就是一票来进行设计的。

心跳的实现:

日志复制是 Raft 实现一致性的核心。

日志复制有 2 种形式,1种是心跳,一种是真正的日志,心跳的日志内容是空的,其他部分基本相同,也就是说,接收方在收到日志时,如果发现是空的,那么他就是心跳。

心跳

既然是心跳,肯定就是个定时任务,和选举一样。在我们的实现中,我们每 5 秒发送一次心跳。注意点:

  1. 首先自己必须是 leader 才能发送心跳。
  2. 必须满足 5 秒的时间间隔。
  3. 并发的向其他 follower 节点发送心跳。
  4. 心跳参数包括自身的 ID,自身的 term,以便让对方检查 term,防止网络分区导致的脑裂。
  5. 如果任意 follower 的返回值的 term 大于自身,说明自己分区了,那么需要变成 follower,并更新自己的 term。然后重新发起选举。
// 心跳任务实现
class HeartBeatTask implements Runnable {

    @Override
    public void run() {
        // 如果不是 leader, 就不需要发送心跳了.
        if (status != LEADER) {
            return;
        }
        // 获取当前时间   
        long current = System.currentTimeMillis();
        // 如果当前时间 - 上一次心跳时间 < 心跳间隔, 就不需要发送心跳了.
        if (current - preHeartBeatTime < heartBeatTick) {
            return;
        }
        // 打印日志,日志信息为当前节点开始发送心跳
        LOGGER.info("=========== NextIndex =============");
        // 打印日志,日志具体信息为当前节点的 nextIndex 信息
        for (Peer peer : peerSet.getPeersWithOutSelf()) {
            LOGGER.info("Peer {} nextIndex={}", peer.getAddr(), nextIndexs.get(peer));
        }
        //  保存当前时间为上一次心跳时间
        preHeartBeatTime = System.currentTimeMillis();

        // 心跳只关心 term 和 leaderID
        // 构建附加日志请求参数,具体参数为:领导人的任期号,领导人ID,领导人已经提交的日志的索引值,领导人已经提交的日志的任期号
        for (Peer peer : peerSet.getPeersWithOutSelf()) {

            AentryParam param = AentryParam.newBuilder()
                .entries(null)// 心跳,空日志.
                .leaderId(peerSet.getSelf().getAddr())
                .serverId(peer.getAddr())
                .term(currentTerm)
                .build();
            // 构建请求对象,主要有三个参数,分别为:请求的命令,请求的参数,请求的地址
            Request<AentryParam> request = new Request<>(
                Request.A_ENTRIES,
                param,
                peer.getAddr());
            // 发送心跳请求
            RaftThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 调用rpcClient的send方法发送请求
                        Response response = getRpcClient().send(request);
                        // 获取响应结果
                        AentryResult aentryResult = (AentryResult) response.getResult();
                        // 从响应结果中获取任期号
                        long term = aentryResult.getTerm();
                        // 如果响应结果的任期号大于当前任期号,则将当前节点的状态转变为跟随者
                        if (term > currentTerm) {
                            LOGGER.error("self will become follower, he's term : {}, my term : {}", term, currentTerm);
                            currentTerm = term;
                            votedFor = "";
                            status = NodeStatus.FOLLOWER;
                        }
                    } catch (Exception e) {
                        // 如果发送失败,则打印日志,日志信息为心跳发送失败
                        LOGGER.error("HeartBeatTask RPC Fail, request URL : {} ", request.getUrl());
                    }
                }
            }, false);
        }
    }
}

需要更改的地方:

作者所写的心跳机制其实并没有解决节点分区的问题,而且按照我的创新点思考,我要选出多个分区,在多个分区中再次选举出组长来。领导者给每个组长发送心跳消息即可。小组组长分发给每个分区的其他小组成员。我的这个创新点也是间接的解决这个分区的问题,通过分区,从而解决分区。

如何进行分区???

(Chatgpt回答)在分布式系统中对节点进行分区可以采用多种算法。以下是一些常见的分区算法:

  1. 哈希分区:根据节点的标识或键值进行哈希计算,将结果映射到不同的分区中。这种方法保证了均匀的数据分布和负载平衡。
  2. 范围分区:根据节点标识或键值的范围将数据划分到不同的分区中。例如,可以根据节点的IP地址范围或者键值的大小范围来进行分区。
  3. 一致性哈希:通过哈希函数将节点和数据映射到一个环状空间上,每个节点在环上占据一个位置。数据根据其哈希值在环上找到离其最近的节点作为所属分区的负责节点。这种方法在节点动态加入或离开时可以有效减少数据迁移。
  4. 拓扑感知分区:该算法考虑到节点之间的网络拓扑结构,并尽可能将在物理位置上相邻的节点放在同一分区中。这有助于减少网络延迟和提高系统性能。
  5. 副本分区:将数据复制到多个节点,以提高系统的可用性和容错性。可以采用主从复制、多主复制或者基于一致性机制的复制方法,并结合其他分区算法来确定数据在哪些节点上进行复制。

心跳接收者的实现

  1. 无论成功失败首先设置返回值,也就是将自己的 term 返回给 leader。
  2. 判断对方的 term 是否大于自身,如果大于自身,变成 follower,防止异步的选举任务误操作。同时更新选举时间和心跳时间。
  3. 如果对方 term 小于自身,返回失败。不更新选举时间和心跳时间。以便触发选举。
AentryResult result = AentryResult.fail();
try {
    //如果没得到锁, 直接返回.
    if (!appendLock.tryLock()) {
        return result;
    }
    //无论成功失败首先设置返回值,也就是将自己的 term 返回给 leader。
    result.setTerm(node.getCurrentTerm());
    // 判断对方的 term 是否大于自身
    if (param.getTerm() < node.getCurrentTerm()) {
        return result;
    }
    //获取当前时间并赋值给上次收到 leader 的心跳时间
    node.preHeartBeatTime = System.currentTimeMillis();
    //获取当前时间并赋值给上次收到 leader 的附加日志时间
    node.preElectionTime = System.currentTimeMillis();
    // 设置 leader
    node.peerSet.setLeader(new Peer(param.getLeaderId()));

    // 如果对方的 term 大于自身, 则变为 follower
    if (param.getTerm() >= node.getCurrentTerm()) {
        LOGGER.debug("node {} become FOLLOWER, currentTerm : {}, param Term : {}, param serverId",
                     node.peerSet.getSelf(), node.currentTerm, param.getTerm(), param.getServerId());
        // 认怂
        node.status = NodeStatus.FOLLOWER;
    }
// 使用对方的 term.
node.setCurrentTerm(param.getTerm());

日志复制实现:

当用户向 Leader 发送一个 KV 数据,那么 Leader 需要将 KV数据封装成日志,并行的发送到其他的 follower 节点,只要在指定的超时时间内,有过半几点返回成功,那么久提交(持久化)这条日志,返回客户端成功,否者返回失败。

因此,Leader 节点会有一个 ClientKVAck handlerClientRequest(ClientKVReq request) 接口,用于接收用户的 KV 数据,同时,会并行向其他节点复制数据,具体步骤如下:

  1. 每个节点都可能会接收到客户端的请求,但只有 leader 能处理,所以如果自身不是 leader,则需要转发给 leader。
  2. 然后将用户的 KV 数据封装成日志结构,包括 term,index,command,预提交到本地。
  3. 并行的向其他节点发送数据,也就是日志复制。
  4. 如果在指定的时间内,过半节点返回成功,那么就提交这条日志。
  5. 最后,更新自己的 commitIndex,lastApplied 等信息。

注意,复制不仅仅是简单的将这条日志发送到其他节点,这可能比我们想象的复杂,为了保证复杂网络环境下的一致性,Raft 保存了每个节点的成功复制过的日志的 index,即 nextIndex ,因此,如果对方之前一段时间宕机了,那么,从宕机那一刻开始,到当前这段时间的所有日志,都要发送给对方。

/**
     * 客户端的每一个请求都包含一条被复制状态机执行的指令。
     * 领导人把这条指令作为一条新的日志条目附加到日志中去,然后并行的发起附加条目 RPCs 给其他的服务器,让他们复制这条日志条目。
     * 当这条日志条目被安全的复制(下面会介绍),领导人会应用这条日志条目到它的状态机中然后把执行的结果返回给客户端。
     * 如果跟随者崩溃或者运行缓慢,再或者网络丢包,
     *  领导人会不断的重复尝试附加日志条目 RPCs (尽管已经回复了客户端)直到所有的跟随者都最终存储了所有的日志条目。
     * @param request
     * @return
  */
@Override
public synchronized ClientKVAck handlerClientRequest(ClientKVReq request) {
    //输出日志,日志信息为:接收到客户端请求,请求内容为:请求内容,当前状态为:当前状态
    LOGGER.warn("handlerClientRequest handler {} operation,  and key : [{}], value : [{}]",
                ClientKVReq.Type.value(request.getType()), request.getKey(), request.getValue());
    //如果当前节点不是领导人,就重定向到领导人
    if (status != LEADER) {
        LOGGER.warn("I not am leader , only invoke redirect method, leader addr : {}, my addr : {}",
                    peerSet.getLeader(), peerSet.getSelf().getAddr());
        return redirect(request);
    }
    //如果当前节点是领导人,就将请求转发给状态机,判断请求类型是否为GET
    if (request.getType() == ClientKVReq.GET) {
        //如果是GET请求,就返回状态机中的值
        LogEntry logEntry = stateMachine.get(request.getKey());
        //如果状态机中的值不为空,就返回状态机中的值
        if (logEntry != null) {
            return new ClientKVAck(logEntry.getCommand());
        }
        //如果状态机中的值为空,就返回空
        return new ClientKVAck(null);
    }
    //LogEntry是日志条目,其中包含了命令和任期号。这里的命令是指客户端的请求,任期号是指当前节点的任期号
    LogEntry logEntry = LogEntry.newBuilder()
        .command(Command.newBuilder().
                 key(request.getKey()).
                 value(request.getValue()).
                 build())
        .term(currentTerm)
        .build();

    // 预提交到本地日志, TODO 预提交
    //使用write方法将日志条目写入到本地日志中
    logModule.write(logEntry);
    LOGGER.info("write logModule success, logEntry info : {}, log index : {}", logEntry, logEntry.getIndex());
    final AtomicInteger success = new AtomicInteger(0);
    //创造一个Future对象,用于存储复制日志的结果
    List<Future<Boolean>> futureList = new CopyOnWriteArrayList<>();

    int count = 0;
    //  复制到其他机器,这里的peerSet.getPeersWithOutSelf()是获取除了自己以外的所有节点
    for (Peer peer : peerSet.getPeersWithOutSelf()) {
        // TODO check self and RaftThreadPool
        count++;
        // 并行发起 RPC 复制.
        futureList.add(replication(peer, logEntry));
    }
    // 用于等待所有的复制日志的结果返回
    CountDownLatch latch = new CountDownLatch(futureList.size());

    List<Boolean> resultList = new CopyOnWriteArrayList<>();

    getRPCAppendResult(futureList, latch, resultList);

    try {
        latch.await(4000, MILLISECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    for (Boolean aBoolean : resultList) {
        if (aBoolean) {
            success.incrementAndGet();
        }
    }

    // 如果存在一个满足N > commitIndex的 N,并且大多数的matchIndex[i] ≥ N成立,
    // 并且log[N].term == currentTerm成立,那么令 commitIndex 等于这个 N (5.3 和 5.4 节)
    List<Long> matchIndexList = new ArrayList<>(matchIndexs.values());
    // 小于 2, 没有意义
    int median = 0;
    if (matchIndexList.size() >= 2) {
        Collections.sort(matchIndexList);
        median = matchIndexList.size() / 2;
    }
    Long N = matchIndexList.get(median);
    if (N > commitIndex) {
        LogEntry entry = logModule.read(N);
        if (entry != null && entry.getTerm() == currentTerm) {
            commitIndex = N;
        }
    }

    //  响应客户端(成功一半)
    if (success.get() >= (count / 2)) {
        // 更新
        commitIndex = logEntry.getIndex();
        //  应用到状态机
        getStateMachine().apply(logEntry);
        lastApplied = commitIndex;

        LOGGER.info("success apply local state machine,  logEntry info : {}", logEntry);
        // 返回成功.
        return ClientKVAck.ok();
    } else {
        // 回滚已经提交的日志.
        logModule.removeOnStartIndex(logEntry.getIndex());
        LOGGER.warn("fail apply local state  machine,  logEntry info : {}", logEntry);
        // TODO 不应用到状态机,但已经记录到日志中.由定时任务从重试队列取出,然后重复尝试,当达到条件时,应用到状态机.
        // 这里应该返回错误, 因为没有成功复制过半机器.
        return ClientKVAck.fail();
    }
}

需要更改的地方:

领导者需要先发送给组长,组长再发送给小组成员。设置一个组长指定时间t,在这一段时间内,如果组长收到大量的返回成功,那它就给领导者返回成功,在指定时间T(T>t)如果有过半的组长发送返回成功。则提交这条日志。

日志接收者的实现步骤:

  1. 和心跳一样,要先检查对方 term,如果 term 都不对,那么就没什么好说的了。
  2. 如果日志不匹配,那么返回 leader,告诉他,减小 nextIndex 重试。
  3. 如果本地存在的日志和 leader 的日志冲突了,以 leader 的为准,删除自身的。
  4. 最后,将日志应用到状态机,更新本地的 commitIndex,返回 leader 成功。
// 真实日志
// 第一次
if (node.getLogModule().getLastIndex() != 0 && param.getPrevLogIndex() != 0) {
    LogEntry logEntry;
    if ((logEntry = node.getLogModule().read(param.getPrevLogIndex())) != null) {
        // 如果日志在 prevLogIndex 位置处的日志条目的任期号和 prevLogTerm 不匹配,则返回 false
        // 需要减小 nextIndex 重试.
        if (logEntry.getTerm() != param.getPreLogTerm()) {
            return result;
        }
    } else {
        // index 不对, 需要递减 nextIndex 重试.
        return result;
    }

}

// 如果已经存在的日志条目和新的产生冲突(索引值相同但是任期号不同),删除这一条和之后所有的
LogEntry existLog = node.getLogModule().read(((param.getPrevLogIndex() + 1)));
if (existLog != null && existLog.getTerm() != param.getEntries()[0].getTerm()) {
    // 删除这一条和之后所有的, 然后写入日志和状态机.
    node.getLogModule().removeOnStartIndex(param.getPrevLogIndex() + 1);
} else if (existLog != null) {
    // 已经有日志了, 不能重复写入.
    result.setSuccess(true);
    return result;
}

// 写进日志并且应用到状态机
for (LogEntry entry : param.getEntries()) {
    node.getLogModule().write(entry);
    node.stateMachine.apply(entry);
    result.setSuccess(true);
}

//如果 leaderCommit > commitIndex,令 commitIndex 等于 leaderCommit 和 新日志条目索引值中较小的一个
if (param.getLeaderCommit() > node.getCommitIndex()) {
    int commitIndex = (int) Math.min(param.getLeaderCommit(), node.getLogModule().getLastIndex());
    node.setCommitIndex(commitIndex);
    node.setLastApplied(commitIndex);
}

result.setTerm(node.getCurrentTerm());

node.status = NodeStatus.FOLLOWER;
// TODO, 是否应当在成功回复之后, 才正式提交? 防止 leader "等待回复"过程中 挂掉.
return result;
} finally {
    appendLock.unlock();
}

需要更改的地方:

如何保证组长的日志不会缺少呢?组长是动态选举出来的,可以解决。

1.5 程序运行

验证 “Leader 选举”

  • 在 idea 中配置 5 个 application 启动项,配置 main 类为 RaftNodeBootStrap 类, 加入 -DserverPort=8775 -DserverPort=8776 -DserverPort=8777 -DserverPort=8778 -DserverPort=8779系统配置, 表示分布式环境下的 5 个机器节点.
  • 依次启动 5 个 RaftNodeBootStrap 节点, 端口分别是 8775,8776, 8777, 8778, 8779.

GIF 2023-10-9 10-44-06

  • 观察控制台, 约 6 秒后, 会发生选举事件,此时,会产生一个 leader. 而 leader 会立刻发送心跳维持自己的地位.

image-20231009105137605

image-20231009105246188

  • 如果leader 的端口是 8775, 使用 idea 关闭 8775 端口,模拟节点挂掉, 大约 15 秒后, 会重新开始选举, 并且会在剩余的 4 个节点中,产生一个新的 leader. 并开始发送心跳日志。

image-20231009105539131

image-20231009105532214

验证日志复制

正常状态下

  1. 在 idea 中配置 5 个 application 启动项,配置 main 类为 RaftNodeBootStrap 类, 加入 -DserverPort=8775 -DserverPort=8776 -DserverPort=8777 -DserverPort=8778 -DserverPort=8779
  2. 依次启动 5 个 RaftNodeBootStrap 节点, 端口分别是 8775,8776, 8777, 8778, 8779.
  3. 使用客户端写入 kv 数据.
  4. 杀掉所有节点, 使用 junit test 读取每个 rocksDB 的值, 验证每个节点的数据是否一致.

非正常状态下

  1. 在 idea 中配置 5 个 application 启动项,配置 main 类为 RaftNodeBootStrap 类, 加入 -DserverPort=8775 -DserverPort=8776 -DserverPort=8777 -DserverPort=8778 -DserverPort=8779
  2. 依次启动 5 个 RaftNodeBootStrap 节点, 端口分别是 8775,8776, 8777, 8778, 8779.
  3. 使用客户端写入 kv 数据.
  4. 杀掉 leader (假设是 8775).
  5. 再次写入数据.
  6. 重启 8775.
  7. 关闭所有节点, 读取 RocksDB 验证数据一致性.

image-20231009110138632

image-20231009110459493

版权属于: 莫那·鲁道

本文原文链接: http://thinkinjava.cn/2019/01/12/2019/2019-01-12-lu-raft-kv/

作品采用: 《 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 》许可协议授权

3

评论

博主关闭了所有页面的评论