0%

用深度强化学习算法 deep Q-learning 玩 CartPole 游戏。

强化学习是机器学习的一个重要分支,通过强化学习我们可以创建一个 agent,让它与环境不断地互动,不断试错,自主地从中学习到知识,进而做出决策。

如图所示,agent 收到环境的状态 state,做出行动 action,行动后会得到一个反馈,反馈包括奖励 reward 和环境的下一个状态 next_state
这样一轮操作下来,agent 便可以积累经验,并且从中训练,学习该如何根据 state 选择合适的 action 来获得较好的 reward 以获得游戏的最终胜利。

image-20220419102819294

在强化学习中有一个著名算法 Q-learning

image-20220419102848954

2013 年,Google DeepMind 发表了论文 Playing Atari with Deep Reinforcement Learning,开辟了一个新的领域,深度学习和强化学习的结合,即深度强化学习。 其中介绍了 Deep Q Network,这个深度强化学习网络可以让 agent 仅仅通过观察屏幕就能学会玩游戏,不需要知道关于这个游戏的任何信息。

在 Q-Learning 算法中,是通过一个 Q 函数,来估计对一个状态采取一个行动后所能得到的奖励 Q(s,a),
在 Deep Q Network 中,是用一个神经网络来估计这个奖励。


接下来我们用一个很简单的游戏来看 Deep Q Network 是如何应用的。

CartPole 这个游戏的目标是要使小车上面的杆保持平衡

state 包含四个信息:小车的位置,车速,杆的角度,杆尖端的速度
agent 的行动 action 包括两种:向左推车,向右推车

  • 在每轮游戏开始时,环境有一个初始的状态,
  • agent 根据状态采取一个行动 action = agent.act(state)
  • 这个 action 使得游戏进入下一个状态 next_state,并且拿到了奖励 reward,next_state, reward, done, _ = env.step(action)
  • 然后 agent 会将之前的经验记录下来 agent.remember(state, action, reward, next_state, done)
  • 当经验积累到一定程度后,agent 就从经验中学习改进 agent.replay(batch_size)
  • 如果游戏结束了就打印一下所得分数,
    没有结束就更新一下状态后继续游戏 state = next_state

image-20220419102909036

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
if __name__ == "__main__":

# 初始化 gym 环境和 agent
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = DQNAgent(state_size, action_size)

done = False
batch_size = 32

# 开始迭代游戏
for e in range(EPISODES):

# 每次游戏开始时都重新设置一下状态
state = env.reset()
state = np.reshape(state, [1, state_size])

# time 代表游戏的每一帧,
# 每成功保持杆平衡一次得分就加 1,最高到 500 分,
# 目标是希望分数越高越好
for time in range(500):
# 每一帧时,agent 根据 state 选择 action
action = agent.act(state)
# 这个 action 使得游戏进入下一个状态 next_state,并且拿到了奖励 reward
# 如果杆依旧平衡则 reward 为 1,游戏结束则为 -10
next_state, reward, done, _ = env.step(action)
reward = reward if not done else -10
next_state = np.reshape(next_state, [1, state_size])

# 记忆之前的信息:state, action, reward, and done
agent.remember(state, action, reward, next_state, done)

# 更新下一帧的所在状态
state = next_state

# 如果杆倒了,则游戏结束,打印分数
if done:
print("episode: {}/{}, score: {}, e: {:.2}"
.format(e, EPISODES, time, agent.epsilon))
break

# 用之前的经验训练 agent
if len(agent.memory) > batch_size:
agent.replay(batch_size)

接下来具体看每个部分:

1. agent 的网络用一个很简单的结构为例:

image-20220419102957374

在输入层有 4 个节点,用来接收 state 的 4 个信息:小车的位置,车速,杆的角度,杆尖端的速度,
输出层有 2 个节点,因为 action 有 0,1 两个值:向左推车,向右推车,就对应着两个行为的奖励值。

1
2
3
4
5
6
7
8
def _build_model(self):
model = Sequential()
model.add(Dense(24, input_dim=self.state_size, activation='relu'))
model.add(Dense(24, activation='relu'))
model.add(Dense(self.action_size, activation='linear'))
model.compile(loss='mse',
optimizer=Adam(lr=self.learning_rate))
return model

2. 需要定义一个损失函数来表示预测的 reward 和实际得到的奖励值的差距,这里用 mse,

image-20220419103013594

例如,杆现在向右倾斜,这时如果向右推小车,那么杆就可能继续保持平衡,游戏的分数就可以更高一些,也就是说向右推车比向左推车拿到的奖励要大,不过模型却预测成了向左推奖励大,这样就造成了差距,我们需要让差距尽量最小。

3. Agent 如何决定采取什么 action

游戏开始时为了让 agent 尽量多尝试各种情况,会以一定的几率 epsilon 随机地选择 action,
之后它不再随机选择,开始根据当前状态预测 reward,然后用 np.argmax() 选择能最大化奖励的 action,
例如 act_values[0] = [0.67, 0.2] 表示 aciton 为 0 和 1 时的 reward,这个的最大值的索引为 0.

1
2
3
4
5
def act(self, state):
if np.random.rand() <= self.epsilon:
return random.randrange(self.action_size)
act_values = self.model.predict(state)
return np.argmax(act_values[0]) # returns action

4. 通过 Gym,agent 可以很轻松地就能与环境互动:

1
next_state, reward, done, info = env.step(action)

env 代表游戏环境,action 为 0 或 1,将 action 传递给环境后,返回: done 表示游戏是否结束,next_state 和 reward 用来训练 agent。

DQN 的特别之处在于 remember 和 replay 方法,

5. remember()

DQN 的一个挑战是,上面搭建的这个神经网络结构是会遗忘之前的经验的,因为它会不断用新的经验来覆盖掉之前的。
所以我们需要一个列表来存储之前的经验,以备后面对模型训练时使用,
这个存储经验的列表叫做 memory,

1
memory = [(state, action, reward, next_state, done)...]

存储的动作由 remember() 函数来完成,即将 state, action, reward, next state 附加到 memory 中。

1
2
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))

**6. replay() **

replay() 是用 memory 来训练神经网络的方法。

  • 首先从 memory 中取样,从中随机选取 batch_size 个数据:
1
minibatch = random.sample(self.memory, batch_size)
  • 为了让 agent 能有长期的良好表现,我们不仅仅要考虑即时奖励,还要考虑未来奖励,即需要折扣率 gamma,

具体讲就是我们先采取了行动 a,然后得到了奖励 r,并且到达了一个新的状态 next s,
根据这组结果,我们计算最大的目标值 np.amax()
然后乘以一个 discount 率 gamma,将未来的奖励折算到当下,
最后我们将当前的奖励和折算后的未来奖励相加得到目标奖励值:

1
target = reward + gamma * np.amax(model.predict(next_state))
  • target_f 为前面建立的神经网络的输出,也就是损失函数里的 Q(s,a)
  • 然后模型通过 fit() 方法学习输入输出数据对,
1
model.fit(state, reward_value, epochs=1, verbose=0)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def replay(self, batch_size):
minibatch = random.sample(self.memory, batch_size)

for state, action, reward, next_state, done in minibatch:
target = reward

if not done:
target = (reward + self.gamma *
np.amax(self.model.predict(next_state)[0]))

target_f = self.model.predict(state)
target_f[0][action] = target

self.model.fit(state, target_f, epochs=1, verbose=0)

if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# -*- coding: utf-8 -*-
import random
import gym
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam

EPISODES = 1000 # 让 agent 玩游戏的次数

class DQNAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.memory = deque(maxlen=2000)
self.gamma = 0.95 # 计算未来奖励时的折算率
self.epsilon = 1.0 # agent 最初探索环境时选择 action 的探索率
self.epsilon_min = 0.01 # agent 控制随机探索的阈值
self.epsilon_decay = 0.995 # 随着 agent 玩游戏越来越好,降低探索率
self.learning_rate = 0.001
self.model = self._build_model()

def _build_model(self):
model = Sequential()
model.add(Dense(24, input_dim=self.state_size, activation='relu'))
model.add(Dense(24, activation='relu'))
model.add(Dense(self.action_size, activation='linear'))
model.compile(loss='mse',
optimizer=Adam(lr=self.learning_rate))
return model

def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))

def act(self, state):
if np.random.rand() <= self.epsilon:
return random.randrange(self.action_size)
act_values = self.model.predict(state)
return np.argmax(act_values[0])

def replay(self, batch_size):
minibatch = random.sample(self.memory, batch_size)
for state, action, reward, next_state, done in minibatch:
target = reward
if not done:
target = (reward + self.gamma *
np.amax(self.model.predict(next_state)[0]))
target_f = self.model.predict(state)
target_f[0][action] = target
self.model.fit(state, target_f, epochs=1, verbose=0)
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay


if __name__ == "__main__":

# 初始化 gym 环境和 agent
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = DQNAgent(state_size, action_size)

done = False
batch_size = 32

# 开始迭代游戏
for e in range(EPISODES):

# 每次游戏开始时都重新设置一下状态
state = env.reset()
state = np.reshape(state, [1, state_size])

# time 代表游戏的每一帧,
# 每成功保持杆平衡一次得分就加 1,最高到 500 分,
# 目标是希望分数越高越好
for time in range(500):
# 每一帧时,agent 根据 state 选择 action
action = agent.act(state)
# 这个 action 使得游戏进入下一个状态 next_state,并且拿到了奖励 reward
# 如果杆依旧平衡则 reward 为 1,游戏结束则为 -10
next_state, reward, done, _ = env.step(action)
reward = reward if not done else -10
next_state = np.reshape(next_state, [1, state_size])

# 记忆之前的信息:state, action, reward, and done
agent.remember(state, action, reward, next_state, done)

# 更新下一帧的所在状态
state = next_state

# 如果杆倒了,则游戏结束,打印分数
if done:
print("episode: {}/{}, score: {}, e: {:.2}"
.format(e, EPISODES, time, agent.epsilon))
break

# 用之前的经验训练 agent
if len(agent.memory) > batch_size:
agent.replay(batch_size)

效果图:

image-20220419103543797 image-20220419103445191

参考来源:https://www.jianshu.com/p/7014c89abeea

DRL相关:

环境:https://gym.openai.com/

easyRL:https://datawhalechina.github.io/easy-rl/#/

神经网络与深度学习:https://nndl.github.io/nndl-book.pdf

好用的运维脚本&工具

jstack神器 (一个轻量级jstack脚本)

  • 快速安装:

​ source <(curl -fsSL https://raw.githubusercontent.com/oldratlee/useful-scripts/master/test-cases/self-installer.sh)

  • 常用命令:

​ show-busy-java-threads ,默认直接输出cpu busy java线程 top5, -p 可指定PID

​ show-busy-java-threads <重复执行的间隔秒数> [<重复执行的次数>] -a <运行输出的记录到的文件> 保存下后可以多次分析对比jsatck

image-20220418230324758

  • 快速下载单个文件
1
2
3
wget --no-check-certificate https://raw.github.com/oldratlee/useful-scripts/release/show-busy-java-threads
chmod +x show-busy-java-threads
./show-busy-java-threads

更多好用的功能:useful-scripts

jmap

  • 查看不同类型占用的内存
1
jmap -histo <进程号> | head -n 20 
  • GC情况
1
jstat -gcutil 17708 1000 

​ mac下好用的gc图形分析工具分析gc.log历史:gchisto下载

image-20220418230353485

  • dump 内存
1
2
jmap -dump:file=heap.bin <进程号> 
分析工具 Memory Analyzer 下载

arthas (终极杀手锏)

arthas介绍

  • 启动arthas

​ wget https://alibaba.github.io/arthas/arthas-boot.jar

​ java -jar arthas-boot.jar

  • dashboard

​ gc cpu mem等..一览无余,基本上可以秒杀其他工具了..

image-20220418230458069

上篇:

工具、效率、时间管理

引:
要成为一名专业人士,需要养成的另外一个强大的习惯就是时间管理技能

一、工具(软件开发工程师)

好用的工具可以在开发中处理一些事情上达到事半功倍的效果

  • 编译器:IntelliJ IDEA、Eclipse(老牌工具可以淘汰了)日常写代码

  • 终端:ITerm2 + oh My zsh, Mac下好用的终端,自带一系列常用 alias、选中即复制、强大的bash等等,完成日常一些终端操作(提交代码、mvn编译、ssh登陆(可配置一键登录或alias)等等)

  • 浏览器:chrome+一系列方便的插件(Vimium、Adblock、ESHead、划词翻译、掘金、Tampermonkey等等)着重推荐Vimium 基本操作浏览器可以不用鼠标了,习惯后效率肯定高于鼠标操作。

  • 本地/远程调试:postman

  • 数据库管理工具:Sequel Pro, mac下快捷键切换比较方便

  • 任务管理TickTick:滴答清单(自带番茄时钟和todoList)

  • git一些好用的界面工具:GitHub Desktop(方便看diff避免误提交)+Sourcetree(方便看git log和搜索)git命令都可以做到但展示效果没有界面工具效果好。

  • 终极神器:Alfred,可以完成mac下一系列的骚操作. 比如快速切换上面一堆应用、一键执行mac上任何一个任务及打开某个功能、搜索某个东西、比如定制自己的workflow基于脚本一键完成线上token获取等等等。

以上,串起来上面的工具,代码写到提交完整过程可以是:从自己todo管理工具里找到任务—->idea写代码—->打开github desktop对比本次git diff差异-→item2 mvn自编译通过再提交。

git相关常用操作(因此强烈建议配置alias)

  • 推送拉取合并:git pull、git rebase、git merge、git push -f origin/xx

  • 本地常用commit点操作:git checkout xxx 、git checkout -b xxx、git amend 、git rebase -i

  • 多任务协同开发:git stash 或每个任务单独切分支,每次push前pull 并rebase下远程分支避免分支和提交的path冲突混乱,合入后再git branch -D 删掉

  • 回退恢复:git reset –hard/soft (hash)

  • 操作提交的commit远程分支上线前用得比较多:git revert/git cherry pick等

二、效率&时间管理

即一些工作意识

早晨——一天的开始:比如早上打开电脑第一步:check 当天邮件会议&日程安排、番茄(活动清单->今日待办—>估算番茄数(25分钟/个)—>执行+反馈记录) 请使用至少2星期,然后好习惯的话,若有比较重要线上业务监控,可以看看监控图、每天要解的bug、查收邮件可以固定放在某一定点,在工作中的番茄时间里尽量不被外界因素打破(会议),定时看看提交的patch列表在里面找几个评审review下。

除了TickTick(滴答清单)工具外,也可使用名为Trello(http://trello.com)、Kanbanflow(http://simpleprogrammer.com/ss-kanbanflow)的工具作为看板来组织一周的工作,推荐后者,因为Kanbanflow有一个内置的番茄钟定时器。

晚上——一天的结束:晚上固定时间回顾跟踪当天清单里task完成情况,基于番茄时钟时间追踪,了解分析每天的时间都是怎么花掉的,看看数据,找出最大的2~3个时间杀手,并及时调整必要的话反思或反馈等等。
关于会议:符合精简原则,关于精简会议的更多细节,可以参考Jason Fried和David Heinemeier Hansson合著的《重来》(Rewor)[Crown Publishing Group, 2010]。豆瓣
学会分解任务——要吃掉一头大象,每次吃一口
关于复盘:复盘的意义是什么,我们为什么要复盘?推荐一部电影《萨利机长》
排期&管理:推荐一篇文章,看看NASA航天级别超级大项目是如何被高效管理的 “阿波罗”登月中的工程管理一瞥
一些常见的时间杀手
看电视。
社交媒体。
新闻网站。
不必要的会议。
烹饪。
玩电子游戏(尤其是网络游戏)。
工间喝咖啡休息。

三、番茄工作法介绍

番茄工作法介绍
”对番茄工作法的正确理解可以令工作生活大为改观,它不仅能帮我能做更多事情,而且能让我可以尽情享受业余时间。一旦我完成了当天的目标(以番茄钟来度量的),我就可以自由自在地做自己想做的事情,甚至能事先控制自己将时间用在哪儿,而不是回过头看自己的花时间都去哪儿了。“ ——某书摘

数学家、理论物理学家、工程师和科学哲学家昂利·庞加莱(Henri Poincaré)在一篇发表的文章http://www.calnewport.com/blog/?s=%23craftsmanincubicle中写到: “加莱……通常在上午10点到12点和下午5点到7点工作。他发现工作再长时间也鲜有成果。”
John Resig的一篇博客文章,John是我非常尊敬的开发者。他在一篇题为“每天写代码”(Write Code Every Day)的文章中,谈到了自己的经历。他之前在业余项目上毫无进展,直到养成了每天至少用30分钟写一定量有用的代码的习惯。在实行新惯例之后,它成了一种习惯,这使他的生产 力获得了巨大的提高。
可以在http://simpleprogrammer.com/ss-write-code阅读这篇文章的完整内容。
番茄工作法介绍:一些番茄工作法有关的文章https://monotasking.blog/

四、相关书籍

比较有现实指导意义的书籍

《软技能-代码之外的生存指南》https://book.douban.com/subject/26835090/

《番茄工作法图解》https://book.douban.com/subject/5916234/

《搞定-无压工作的艺术》https://book.douban.com/subject/4849382/

长期来看值得思考

《原则-生活和工作》https://book.douban.com/subject/30189843/

《暗时间》 https://book.douban.com/subject/6709809/

《奇特的一生》https://book.douban.com/subject/1115353/

下篇 :

开发规范、迭代管理规范

一、迭代管理规范

1.1迭代节奏

image-20220418221210621

1.2、 Scrum 实践

image-20220418223155864

​ 1⃣️-9⃣️是一个敏捷迭代的过程

在阶段2⃣️的sprint Planning Meeting 中应该产生模块负责人,自愿方式(产生自RD、FE、PM)中。
模块负责人职责:

    a、迭代周期中定期check所负责模块干系人员开发进度,同步风险。
    b、协调资源,保障所负责模块顺利上线
    c、参与日常站会

在阶段3⃣️已经确定下了本期sprint Backlog后,接着还应该再做两件事:

a)分派

​ 由负责人拆分大的需求卡片(可以是迭代需求和系统优化需求2部分),然后由开发同学主动认领或由负责人指派任务卡片。

​ 一个拆分后的icafe卡片应该至少包含:

​ a、任务标题及类型
​ b、必要时的任务内容概述
​ c、负责人
​ d、工时估算

b)协调排期

​ 往往一个需求在sprint期间需要PM、RD、FE、QA一起协作完成,因此在sprint的开始阶段4⃣️开始前,应该事先协调好不同角色参与者的排期。

具体方式可以是一个可以直观的展示时间轴的排期日历或看板墙,比如云端共享excel或目前采用的甘特图软件,方便sprint期间聚焦对齐,模块负责人也方便及时跟进。

image-20220418223920020

排期协作带来的好处:

​ 模块负责人机制保障了分工明确,上下对齐,很好的锻炼了个人协调能力,帮项目总负责人(scrumMaster)分担精力
不同角色参与者事先明确排期,最大化的规避了delay风险,提高了彼此的效率和团队协同节奏感

二、后端编程规范

大部分上规则参考阿里巴巴Java开发手册(华山版)

开发中常见一些容易忽视的开发习惯整理:

1、命名问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1.1、局部变量
反例
{
//过于随意的局部变量命名,不够整洁和严谨
User rs = userService.getUserInfoById(userId);
String m = rs.getMobile();
String id = rs.getIdCard();
String add = rs.getAddress();
String v = rs.getValue();
}

正例
{
User user = userService.getUserInfoById(userId);
String mobile = user.getMobile();
String idCard = user.getIdCard();
String address = user.getAddress();
}

1.2、私有方法命名

私有方法有时候和会和当前所在类关系不太大,仅是临时用于抽取,故应该尽量在命名上能见名知义
反例:
Map getInfo(){
}
Map getMap(){
}

2、循环嵌入较深

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
反例,嵌入过深,降低了代码可读性和可维护性
User rs = userService.getUserInfoById(userId);
if (rs != null) {
Order rs2 = orderService.getOrderInfo(orderId);
if (rs2 != null) {
String name1 = rs2.getOrderName();
if (name1 != null) {
///
}
}
}


正例
//在一开始就取反返回,有效避免了后续更多的嵌入,或借助Optional消除分支判断
User user = userService.getUserInfoById(userId);
if (user == null) {
return false;
}
Goods goods = stockService.getGoodsInfoById(goodsId);
if (goods == null) {
return false;
}

3、service做的逻辑过于冗余

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
反例,12-25行做的逻辑全部是check参数和适配组装等常见的service层无关的次要逻辑,显得整个isEnableToBy方法逻辑略显臃肿
//判断当前用户是否可以支付当该订单
@Autowired
private UserService userService;
@Autowired
private StockService stockService;
@Autowired
private MemberService memberService;
boolean isEnableToPay(String userId, String orderId) throws IOException {
User user = userService.getUserInfoById(userId);
if (user == null) {
return false;
}
int age = user.getAge();
String mobile = user.getMobile();
String idCard = user.getIdCard();
String address = user.getAddress();
if (mobile != null && mobile.length() > 11) {
throw new RuntimeException("电话格式非法");
}
if (age < 18) {
throw new RuntimeException("年龄非法");
}
UserDto userDto = new UserDto(user);
boolean isMember = memberService.isMember(userDto);
if(!isMember){
return false;
}
boolean isOrderExist = orderService.isExist(orderId);
if (!isOrderExist) {
return false;
}
//step3:checkOrder
boolean isOrderExist = orderService.isExist(orderId);
if (!isOrderExist) {
return false;
}
return true;
}

正例:尽量简化service逻辑,主干只保留了引入的service参与计算的逻辑,次要非service逻辑,抽离出去,显得过程会清晰,例子中isEnableToPay方法只干了3件事,a)获取用户信息、b)检测是否是会员、c)订单是否存在,完成了判断用户是否可以完成支付这件事,思路也比较清晰


//判断当前用户是否可以支付该订单
boolean isEnableToPay(String userId, String orderId) throws IOException {
//step1:get userInfo
User user = userService.getUserInfoById(userId);
boolean legalUser = UserUtil.checkUser(user);
if (!legalUser) {
return false;
}
//换行
UserDto userDto = UserUtil.adapterUserDto(user);
//step2:check isMember
boolean isMember = memberService.isMember(userDto);
if (!isMember) {
return false;
}
//step3:checkOrder
boolean isOrderExist = orderService.isExist(orderId);
if (!isOrderExist) {
return false;
}

return true;

}

4、类的职责问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//4.1 XXXDao、XXXService
反例:订单服务接口定义中又出现了查询库存服务,势必service实现类又被迫引入了OrderDao,显得服务职责不够明确,后拆分期维护成本大
public interface OrderService {
//有时 类名、方法名、参数、返回值 4者一起,可一起用来声明该方法
boolean isExist(String orderId);

Order getOrderById(String orderId);

Stock getStockInfo(String stockId);

}
正例:基础的Dao、Service层只做当前XXXDao、XXXService相关的CURD逻辑,禁止混用,复杂的业务可以再向上抽象一层service调度各基础层service做交互计算,最终在controller层统一收口

public interface OrderService {
boolean isExist(String orderId);

Order getOrderById(String orderId);

}
//将库存服务拆出去
public interface StockService {
boolean isExist(String orderId);

Stock getStockInfo(String stockId);

}
//4.2 尽量不要在controller层做多余的逻辑(可以封装到serveice层),controller层做到足够精简,另外bean、XXUtil里不应该引入service.

5、bean的定义

业务上除了解析反序列化提取上游响应信息如:组件es、redis等复杂数据结构外,应该避免直接使用map包装大json,序列化VO或组装DTO时,应该定义好该数据结构。

若不需要反序列化的Response json且创建的频率是小而频繁的,可以考虑使用 #common.model.LiteModel

6、log、注释 规范

1
2
3
4
5
6
7
8
9
10
11
12
1、log
反例:
log.info("success");
正例:关键地方点,日志打印出当前上下文相关的信息,userId,sessionId等有价值的标示信息,否则grep日志排查比较困难,相当于没打日志
log.info("pay success,userId:{},orderId:{},userId,orderId);

2、注释
代码当中出现注释来解释代码,出现注释可能原因是作者认为某些内容没有说清楚,需要增加注释。有些注释比较有用:
之处为什么要使用特定的方式完成某个工作
引入了不常见的算法

其他啰嗦的代码注释可以通过代码来自解释

三、codeReview规范

3.1 面向提交者

  1. 建议先 mvn clean compile -P dev -Dmaven.test.skip=false 保证编译可通过,且已自测
  2. 一次提交的代码量不要太大
  3. 一个评审的变更应该尽量小,只做一件事,如果一件事可以拆分成多件,那么应该对应为不同的cr,例如:尽量不要将【修复一个bug】和【重构一个函数】放在同一个评审里
  4. 精准指定reviewer (必须指定:1、自己当前负责开发模块相关负责人,2、小组所在具有+2权限的同学(blocking reviewer),3、及一起开发同一模块的小伙伴)
  5. 写好 commit message:见:【四、git commit message规范】
  6. 对于复杂大的新需求或重构,最好能在评审里附上需求背景及技术设计文档,方便评审者快速深入了解背景

3.2 面向评审者

  1. 小组的blocking reviewer、模块负责人及时看icode评审提醒, 必须review代码,并给出结论(打分)

  2. 其余相关人员鼓励踊跃附上精彩评论,blocking reviewer可以作为参考,例如非blocking reviewer有2个+1,则blocking reviewer直接可以给出+2

  3. CodeReview CheckList:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    命名规范,是否见名知义?
    类、函数是否太长,嵌套是否太深
    代码是否具有较高的可读性,再过一年自己还能看懂吗?
    对象NPE判断了吗、各种边界条件处理了吗?
    API设计是否符合接口规范
    代码是否明显存在可复用的地方?
    线程安全、性能问题、是否为良好的设计模式、是否会有内存泄露?
    事务一致性处理,依赖服务宕机时会发生什么?
    自己遇到过同样的坑?
    有testcase?
    业务逻辑处理是否有明显的问题?
    更多见【二、后端编程规范】
    ...
  1. 最佳实践:

    1
    2
    3
    4
    5
    一次review代码在500行内,且不超过30分钟
    及时review代码,确保当天的review任务当天完成,提升彼此效率
    认真review,并对review结果负责
    不要吝惜自己的comments
    整个评审如果都看不懂,可以当面去问

反例:
一块模块T由A1、A2一起开发,A0 是blocking reviewer,但由于A0没有及时review或没有仔细review就 +2合入,同时也会间接造成A1不断的基于模块T提交bugFix,或A1可能后期就直接找不相干的B去合入。

造成的影响是:有始无终,A0 没有能及时知晓且cover住A1代码可能会出现的问题,同时A2也完全不知道后期的变更,是一个尴尬的场景。

正例:

若B是blocking reviewer可以选择拒绝+2(特殊情况除外),A1、A2、A0应有始有终,也减少了A1返工fixBug甚至是面临方案重新设计的风险.

3.3 review带来的好处

  1. 提高代码质量
  2. 找出潜在bug,提高提测质量,避免来回返工
  3. 互相学习,提升团队coding水平
  4. 让其他不熟悉代码的人明白作者意图想法,便于以后轻松维护代码,必要时可backup

四、git commit message规范

commit message由icafe项目标识+【commit类型】+commitDescription组成.

常用到的commit类型:

  • story(产品需求)
  • bugFix
  • refactor(重构&优化)
  • Test(单元测试)
  • Other(不在上述范围内的) 等

commitDescription原则:

  • 尽量简短的一句话描述commit作用
  • 最好使用中文
  • 能体现出主、谓、宾、定若有依赖关系,需要特殊体现

带来的好处:

  • 节省上线发布人员时间,可以清晰的判断变更带来的影响及回滚止损等
  • 规范后可以自动生成changLog也方便统计
  • 反例:
    
    * | | 9dffee08d NextGenDialog-2677 fix bugs
    * | | 5290ce960  Merge "NextGenDialog-2677 fix bugs" into feature-5.3
    * | | 8323b4aae NextGenDialog-2679 fix
    * | | 9fe529f2f NextGenDialog-2039 降低复杂度    //既像bug又像story
    * | | a89f20243 AICP-WEB-3058 优化
    
    
    正例:
    
    * | | 9dffee08d NextGenDialog-2677【bugfix】创建订单时未判断用户是否是会员
    * | | 5290ce960  Merge "NextGenDialog-2677 【bugfix】创建订单时未判断用户是否是会员" into feature-5.3
    * | | 9fe529f2f NextGenDialog-2039 【story】创建订单接口
    * | | a89f20243 AICP-WEB-3058 【refactor】订单创建逻辑优化重构
    

好久不用hexo,发现一堆问题, 记录如下:

一、前言

1.1 交互的趋势

让机器去适应人, 人不是人是适应机器

任何产品都需要依赖对话系统为之赋能, 未来, 对话系统将可能成为重要的人机交互窗口. 这一切,都可以从设计一个小的封闭域Chatbot开始——LuQi.

举个例子: “帮我查一下明天或者后天,晚上最便宜的去上海的机票”, 从用户的操作和实际体验来看, 图形交互无法一次给出结果, 用户需要先打开App, 先查一次明天机票再查一次后天机票, 然后手动对比结果. 而对话式交互 “完胜” , 它可以直接给出相关条件的检索结果,前提是人工智能足够优秀.

Read more »

Java8 新特性总结

1
2
3
4
5
6
7
8
9
10
11
一、lamada、函数式编程
二、Stream介绍
Stream流⽔线解决⽅案
三、默认方法:
四、异步处理
五、时间处理类
六、why java8?
七、函数编程技巧
八、超越java 8—jdk版本重大特性总结
九、结论总结
十、附录

一、lamada、函数式编程

函数式编程:

即行为参数化;

lamada:

简洁地表示可传递的匿名函数的一种方式:它没有名称,但它有参数列列表、函数主体、返回类型,可能还有⼀一个可以︎出的异常列列表。

特点:︎︎︎︎匿名、函数、简洁、传递

可以在函数式接口上使⽤用Lambda表达式。

函数式接⼝:只定义⼀个抽象⽅方法的接⼝
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
demo

Fruit apple = new Fruit("apple", 1);
Fruit banana = new Fruit("banana", 2);
List<Fruit> fruitList = Lists.newArrayList(apple, banana);

//java.util.function包中引⼊了⼏个新的函数式接⼝
// ⼀、⼀些函数式接⼝
// 1、Consumer
Consumer<? super Fruit> c = (Consumer<Fruit>) fruit ->
System.out.println(fruit.getName());
c.accept(apple);
fruitList.forEach(c);
//2、Supplier,好处:⽤的时候相当于每次直接从⼯⼚⾥⾯Lazy返回⼀个结果
Supplier<Fruit> ss = Fruit::new;
System.out.println(ss.get());
System.out.println(ss.get());
//3、Function
MyFunction<String, Integer, Fruit> sssa = Fruit::new;
sssa.apply("s", 32);
Function<Fruit, Integer> a = Fruit::getMoney;
// 4、Predicate
Predicate<Fruit> p = o -> o.getMoney() > 1;
Integer size
=fruitList.stream().filter(p).map(a).collect(Collectors.collectingAndThen(toLi
st(), List::size));


// ⾃定义Function⽀持多参数
@FunctionalInterface
public interface MyFunction<T, U, R> {
/**
* Applies this function to the given arguments.
*
* @param t the first function argument
* @param u the second function argument
* @return the function result
*/
R apply(T t, U u);
}

二、Stream介绍:

    在调⽤collect之前,没有任何结果产⽣,实际上根本就没有从menu⾥选择元素。 流的处理过程

是一种内部迭代,** 你可以这么理解:链中的⽅方法调⽤用都在排队等待,直到调⽤用collect。Java 8的Stream以
其延迟︎︎性而著称,它们被︎刻意设计成这样,即︎︎︎︎︎︎延迟操作,有其独特的原因: Stream就像是⼀一个︎盒,它接收请
求⽣成结果。当你向一个 Stream发起一系列列的操作请求时,这 些请求只是被⼀一保存起来。只有当你
向Stream发起⼀个︎︎︎作时,才会实际地进⾏计算。这种设计具有显著的优点,特别是你需要对Stream进⾏多个操作时(你有可能先要进⾏filter操 作,紧接着做⼀个map,最后进⾏⼀次终端操作reduce;这种 ⽅式下Stream只需要遍历⼀次, 不需要为每个操作遍历⼀次所有的元素.

粗略地说,集合与流之间的差异就在于什么时进⾏计算。

图4-2显示了流操作的顺序:filter、map、limit、collect 每个操作简介如下。 举个例⼦:

1
2
List<String> dishenameList = menu.stream().filter(d -> d.getCalories() >
300).ma p(Dish::name).limit(3).collect(toList());

image-20211102185024921

流只能遍历⼀次。遍历完之后,我们就说这个流已经被消费了,——Streams库的内部迭代可以⾃动选择 ⼀种适 合你硬件的数据表示和并⾏实现。流利⽤了内部迭代:替你把迭代做了.

image-20211102190455717

image-20211102190514125

Stream流⽔水线解决⽅方案

Stream上的所有操作分为两类:中间操作和结束操作,中间操作只是一种标记,只有结束操作才会
触发实际计算。中间操作⼜可以分为⽆状态的( Stateless )和有状态的( Stateful ),⽆无状态中间操作是指元素
的处理理不不受前面元素的影响,而有状态的中间操作必须等到所有元素处理理之后才知道最终结果,⽐如排
序是有状态操作,在读取所有元素之前并不能确定排序结果;结束操作又可以分为短路操作和非短路操作,短路操作是指不用处理全部元素就可以返回结果,⽐比如找到第一个满⾜足条件的元素。之所以要进行如此精细的划分,是因为底层对每⼀种情况的处理方式不同。

    很多Stream操作会需要⼀个回调函数(Lambda表达式),因此一个完整的操作是<*数据来源,操作、回调函数 > 构成的三元组。 Stream 中使⽤用 Stage 的概念来描述⼀个完整的操作,并⽤某种实例例化后的PipelineHelper*来代表Stage,将具有先后顺序的各个Stage连到⼀起,就构成了整个流⽔水线。跟

Stream相关类和接口的继承关系图示。

Stream流⽔水线组织结构示意图如下:

image-20211102191203210

图中通过 Collection.stream() ⽅法得到Head也就是stage0,紧接着调⽤⼀系列的中间操作,不断产 ⽣新的Stream。这些Stream对象以双向链表的形式组织在⼀起,构成整个流⽔线,由于每个Stage都 记录了前⼀个Stage和本次的操作以及回调函数,依靠这种结构就能建⽴起对数据源的所有操作。这就 是Stream记录操作的⽅式。当前Stage本身才知道该如何执⾏⾃⼰包含的动作。这就需要有某种协议来 协调相邻Stage之间的调⽤关系。这种协议由Sink接⼝完成.

image-20211102191357093

实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接⼝口⽅方法

1
2
3
遍历元素时调⽤用,接受⼀一个待处理理元素,并对元素进⾏行行处理理。Stage
把⾃自⼰己包含的操作和回调⽅方法封装到该⽅方法⾥里里,前⼀一个Stage只需要
调⽤用当前Stage.accept(T t)⽅方法就⾏行行了了。

图中通过Collection.stream()⽅方法得到 Head 也就是stage0,紧接着调⽤用⼀一系列列的中间操作,不不断产
⽣生新的Stream。 这些Stream对象以双向链表的形式组织在⼀一起,构成整个流⽔水线,由于每个Stage都
记录了了前⼀一个Stage和本次的操作以及回调函数,依靠这种结构就能建⽴立起对数据源的所有操作
。这就
是Stream记录操作的⽅方式。当前Stage本身才知道该如何执⾏行行⾃自⼰己包含的动作。这就需要有某种协议来
协调相邻Stage之间的调⽤用关系。这种协议由 Sink 接⼝口完成

实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接⼝口⽅方法

image-20211102191428454

Sink完美封装了了Stream每⼀步操作,并给出了[处理->转发]的模式来叠加操作。

举例:

image-20211102191456731

总⽽⾔之,流的使⽤⼀般包括三件事:

⼀个数据(如集合)来执⾏⼀个查询;

⼀个中间作,形成⼀条流的流⽔线;

⼀个操作,执⾏流⽔线,并能⽣成结果。

 以上,流的流⽔线背后的理念类似于构建器模式。在构建器模式中有⼀个调⽤链⽤来设⼀套配 (对流来 说这就是⼀个中间操作链),接着是调⽤built⽅法(对流来说就是终端操作)。 

如你所⻅,Stream API实现如此巧妙,即使我们使⽤外部迭代⼿动编写等价代码,也未必更加⾼效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
demo

//二、Stream
// 1、流的扁平化 flatMap
List<String> words = Lists.newArrayList("Hello", "World");
List<String> newWords = words.stream()
.map(word -> word.split(""))
.flatMap(Arrays::stream)
.distinct()
.collect(toList());

// 2、归约 将流中所有元素反复结合/折叠起来,得到一个值
List<Integer> numbers = Lists.newArrayList(1, 2, 3);
int sum = numbers.stream().reduce(0, Integer::sum);

// 3、并行流(parallel),默认使⽤forkJoin(其实就是分治算法的并⾏版本)
//并⾏流并不总是⽐顺序流快

// ⾃动装和操作会⼤⼤低性能
fruitList.stream().parallel().collect(Collectors.toList());

// 方式1 自定义一个Collector收集器,定制自己的stream处理逻辑
List<Fruit> ppp = fruitList.stream().collect(new ToListCollector<>());
// 方式2,更紧促
List<Fruit> fruits = fruitList
.stream()
.collect(
ArrayList::new, //supplier 供应源
ArrayList::add, //accumulator 累加器
List::addAll //combiner 组合器
);

//6、 java.util.Optional 一个容器类,代表一个值存在或不存在
Optional.of(banana).filter(e -> e.getMoney() == 1).get();

// 三、java8 其他高阶操作:
// 1、科里化 􏱷
// 是一种􏳿将􏶞􏵕2个参数(比如,x和y)的函数f转􏷊化为使用一个参数的函数g,
// 并且这个函数的返回值也是一个函数,它会作为新函数的一个参数。
// 后者的返回值和􏱍始函数的 返回值相􏱬,即f(x,y) = (g(x))(y)。
// 比如:单位转换通常都会􏸙及转换因子以及基线调整因子的问题。
DoubleUnaryOperator convertCtoF = curriedConverter(9.0 / 5, 32);
DoubleUnaryOperator convertUSDtoGBP = curriedConverter(0.6, 0);

convertCtoF.applyAsDouble(1D);
convertUSDtoGBP.applyAsDouble(1D);

// f(x)=ax+b
static DoubleUnaryOperator curriedConverter(double f, double b) {
return (double x) -> x * f + b;
}


/**
* @desc: 自定义Collector 将Stream<T>中的所有元素收集到一个 List<T>里
*/
public class ToListCollector<T> implements Collector<T, List<T>, List<T>> {
// 建立􏾒的结果容器
@Override
public Supplier<List<T>> supplier() {
return ArrayList::new;
}

//将􏵐􏷷元素添加到结果容器
@Override
public BiConsumer<List<T>, T> accumulator() {
return List::add;
}
// 合并􏲫两个结果容容器
@Override
public BinaryOperator<List<T>> combiner() {
return (list1, list2) -> {
list1.addAll(list2);
return list1;
};
}

//􏴮对结果容器应用的最终转换
@Override
public Function<List<T>, List<T>> finisher() {
return Function.identity();
}

@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(
IDENTITY_FINISH, CONCURRENT));
}
}

三、默认⽅方法:

它提供的能⼒力力能帮助类库的设计 者们定义新的操作,增强接⼝口的能⼒力力,它们︎蔽了了将来的变化对 ⽤用户的

影响.

比如:List类1.8新增的foreach⽅法

1
2
3
4
5
6
7
8
9
10
* @param action The action to be performed for each element
* @throws NullPointerException if the specified action is null
* @since 1.8
*/
default void forEach(Consumer<? super T> action) {
Objects.requireNonNull(action);
for (T t : this) {
action.accept(t);
}
}

四、异步处理理

CompletableFuture

CompletableFuture提供了了像thenCompose、thenCombine、allOf这样的 操作,对Future︎及的通⽤用设
计模式提供了了函数式编程的细︎度控制,有助于避免使⽤用 命令式编程的模︎代码。

五、时间处理理类

LocalDateLocalTimeInstantDuration 以︎及 Period 极⼤简化⽅便了日期处理⽅式,包括:

  • Java 8之前⽼版的java.util.Date类以及其他⽤于建模⽇期时间的类有很多不⼀致及 设计上的缺,包括易变性以及的值、默认值和命名。
  • 新版的⽇期和时间API中,⽇期时间对象是不可变的。
  • 新的API提供了两种不同的时间表示⽅式,有效地区分了运⾏时⼈和机器的不同需求。
  • 可以⽤绝对或者相对的⽅式操⽇期和时间,操作的结果总是返回⼀个新的实例,⽼的⽇期时间对象不会发⽣变化。
  • TemporalAdjuster让你能够⽤更精细的⽅式操⽇期,不再限于⼀次只能改变它的 ⼀个值,并且你还可按照需求定义⾃⼰的⽇期转换器。 你现在可以按照特定的格式需求,定义⾃⼰的格式器,打印输出或者解⽇期时间对象。 这些格式器可以通过模创建,也可以⾃⼰编程创建,并且它们都是线程安全的。
  • 可以⽤相对于某个地区/位的⽅式,或者以与UTC/格尼时间的绝对差的⽅式表 示时区,并将其应⽤到⽇期时间对象上,对其进⾏本地化。
  • 可以使⽤不同于ISO-8601标准系统的其他⽇历系统

六、why java8?

函数式编程:

  • 实现和维护系统,无需担心锁引起的各种问题,充分发︎系统的并发能力;
  • 共享的可变数据,︎︎纯粹且⽆作用.在完全无锁的情况下,使⽤用多核的并发机制;
  • 下⾯面是这⼀章中你应该掌握的关键概念。
  • 从⻓远看,︎少共享的可变数据结构能帮助你︎低维护和调︎程序的代价。
  • 函数式编程⽀持⽆副作⽤的方法和声明式编程。
  • 函数式方法可以由它的输入参数及输出结果进行︎断。
  • 如果一个函数使用相同的参数值调用,总是返回相同的结果,那么它是引⽤用透明的。采用递︎可以取得迭代式的结构,⽐比如while循环。
  • 相对于Java语言中传统的递︎,“︎︎递”可能是⼀种更好的⽅式,它开启了一︎扇门,让我们有机会最终使用编译器进行优化。

七、函数编程技巧

  • ⾼高︎阶函数:
1
2
3
Function<String, String> transformationPipeline
= addHeader.andThen(Letter::checkSpelling)
.andThen(Letter::addFooter);
  • 函数科⾥化(⻅上⽂demo)

  • Stream 延迟计算 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 // 延迟计算:
//1是集合类持有的所有元素都是存储在内存中的,非常巨大的集合类会占用大量的内存,
// 而Stream的元素却是在访问的时候才被计算出来,这种“延迟计算”的特性有点类似Clojure的lazy-seq,占用内存很少。
//2是集合类的迭代逻辑是调用者负责,通常是for循环,而Stream的迭代是隐含在对Stream的各种操作中,例如map()。
Stream.generate(new Supplier<Long>() {
//反复调用get(),将得到一个无穷数列,利用这个Supplier,基于生成流,可以创建一个无穷的Stream
long value = 0;

@Override
public Long get() {
this.value = this.value + 1;
return this.value;
}
}).map((x) -> x * x).limit(10).forEach(System.out::println);
//对这个Stream做任何map()、filter()等操作都是完全可以的,
// 这说明Stream API对Stream进行转换并生成一个新的Stream并非实时计算,而是做了延迟计算。

Stream<Long> natural = Stream.generate(new NaturalSupplier());
natural.map((x) -> {
return x * x;
}).limit(10).forEach(System.out::println);

  • 模式匹配? Java8⽀持的不太好,感觉作⽤不太⼤,代替switch case

  • 实现缓存记忆表: ⽐如Map::computeIfAbsent()⽅法

1
2
3
4
5
6
7
// cache类似DP算法中的记录薄,基于function,computeIfAbsent每次会将表达式计算结果暂存,提升递归性能,非常的优雅, // 同样的实现,在java8前会很麻烦
static int fibonacciJava8(int n) {
return (int) cache.computeIfAbsent(n, (key) -> {
System.out.println("calculating FibonacciJava8 " + n);
return fibonacciJava8(n - 2) + fibonacciJava8(n - 1);
});
}
  • 结合器 ⽐如:CompletableFuture::thenCombine(),该⽅法接受两个CompletableFuture⽅法 和⼀个BiFunction⽅法,返回 另⼀个CompletableFuture⽅法。
1
2
3
4
demo: 
static Function compose(Function g, Function f) {
return x -> g.apply(f.apply(x));
}
  • 其他:

    1
    2
    3
    4
    5
    6
    7
        // 4、骚操作:实现一个失败后重试最多5次的代码
    Stream.generate(() -ss> Math.random() > 0.8 ? "ok" : null)
    .limit(5)
    .filter(Objects::nonNull)
    .findFirst()
    .ifPresent(System.out::println);
    }
  • Idea ⾃带Refactor Function 骚操作:

    image-20211102194326898

八、超越java 8—jdk版本重大特性总结

Java 8 : lambda 函数式计算、 stream 、 JVM 废弃永久代 , 引入 metaSpace 区

Java9 : Jigsaw 模块系统 ( 打包模块化 )

增强了 Stream , Optional , Process API

新增 HTTP2 Client

Java 10: 新增局部类型推断 var

Java 11 : Lambda 表达式中使用 var , 引入 ZGC( 实验功能 )

Java 12 : switch 表达式扩展 , 增强 g1 ( 自动返回未使用堆给操作系统 )

java 13 : 增强 ZGC( 释放未使用内存 ) 、引入文本块功能

Java 14: 移除CMS收集器

Java 15(2020-9-15 发布 ): ZGC 、 Shenandoah 垃圾回器 从实验功能变为产品功能

九、结论总结

1 、行为参数化(Lambda以及方法引用)

这些值具有类似Function<T, R>、Predicate或者BiFunction<T, U, R>这样的类型,值的接收方可以通过
apply、test或其他类似的⽅法执行这些方法。Lambda表达式自身是 一个相当酷︎的概念,不不过Java 8对
它们的使用方式——将它们与全新的Stream API相结合,最终把它们推向了了新一代Java的核⼼。

2 、流

它采用︎︎算法将这些操作组成一个流水线,通过单次流遍历, 一次性完成所有的操作。

它的parallel⽅法能帮助将一个Stream标记为适合进⾏并⾏处理。

3 、 CompletableFuture

CompletableFuture提供了了像thenCompose、thenCombine、allOf这样的 操作,对Future︎及的通⽤用设
计模式提供了了函数式编程的细︎度控制,有助于避免使用命令式编程的模︎代码。

4 、optional

如果在程序中始终如一地使用Optional,你的应⽤应该永远不会发⽣生NullPointerException异常。

5 、默认⽅法

它提供的能力能帮助类库的设计者们定义新的操作,增强接口的能力,它们︎蔽了将来的变化对用户的
影响.

十、附录

Lambda表︎式和JVM︎字节码

编译时,匿匿名类和Lambda表达式使⽤了不不同的字节码指令。(javap -c -v ClassName)

Lambda 创建额外的类 现在被invokedynamic指令替代了了。这种方式使⽤用 invokedynamic ,可以将实现
Lambda表达式的这部分代码的字节码⽣成 推︎到运行时。这种情况下,编译器可以生成 ⼀一个方法,该⽅
法含有该Lambda表达式同样的签名.

参考:

流介绍: https://www.edjdhbb.com/2019/02/23/java-8-stream%E5%BA%95%E5%B1%82%E5%8E%9F%E7%90%86/

《Java8 实战》https://github.com/zxiaofan/JavaBooks/blob/master/Java%208%E5%AE%9E%E6%88%98.pdf

1NQ高效线程池

关于1NQueue框架的使用和设计思考:
1NQueue设计的目的,是为了轻而快的帮助应用实现异步处理,同时降低请求并行度及提高应用吞吐量。最终实现,通过很小的改动及非常轻的体量,帮助应用在性能方面实现比较大的提升。
框架的应用和设计思想也很简单,在 保证线程池灵活伸缩 的基础上,通过一定的改造, 使整个框架具备一个很高的执行效率,充分挖掘并行计算的处理能力

特点:

①前置收集:收集提交任务的时候可以在异步提交的基础上再自定义一个Queue,整个流程是任务先提交到Queue我们再启动一个守护线程去不断拉取并往线程池提交,这样以来相当于在水池入口处安装了一个三角漏斗,使水的流速在快的情况下减缓注入,有点时间换空间思想,但也不全是。
②后置处理:线程池任务到达一定阈值会触发拒绝策略,1NQ自定义拒绝策略,启用线程Deque模式,将所有任务Drawto到Deque中。

所以,倘若我们固定了大小,带来的后果就是要么处理能力闲置导致资源浪费,要么就是处理能力不够导致应用性能下降。

如果使用无界的线程池就等于放弃了线程池的伸缩性,这样,线程池就变成了一个固定大小的线程池。这样的话,线程数量的设置就成了一个比较棘手的问题,过大,会造成资源的浪费,过小,会出现性能不足的情况。
所以,我们可以自定义线程池来针对具体业务模型提高并发处理能力,也像netty那样,在JDK基础上自定义了一堆高性能的组件,从而更高效的挖掘CPU处理能力。

BIO,同步阻塞式IO,简单理解:一个连接一个线程(适合稳定少数连接)

NIO,同步非阻塞IO,简单理解:一个请求一个线程: 标准/典型的Reactor(适合大量短链接)

AIO,异步非阻塞IO,简单理解:一个有效请求一个线程:改进实现的Proactor(适合长重操作,代价是新启线程)

BIO里用户最关心"我要读",NIO里用户最关心"我可以读了",在AIO模型里用户更需要关注的是"读完了"。

概念

NIO

本身是基于事件驱动思想来完成的,其主要想解决的是BIO的大并发问题: 在使用同步I/O的网络应用中,如果要同时处理多个客户端请求,或是在客户端要同时和多个服务器进行通讯,就必须使用多线程来处理。也就是说,将每一个客户端请求分配给一个线程来单独处理。这样做虽然可以达到我们的要求,但同时又会带来另外一个问题。由于每创建一个线程,就要为这个线程分配一定的内存空间(也叫工作存储器),而且操作系统本身也对线程的总数有一定的限制。如果客户端的请求过多,服务端程序可能会因为不堪重负而拒绝客户端的请求,甚至服务器可能会因此而瘫痪。 (用内核基于事件回掉机制一个线程轮询,代替大量线程阻塞)

JDK中 nio模型

在jdk中ServerSocketChannel(ServerSocket用于接收请求事件)、SocketChannel(socket读、写)、Selector(通道选择器)、ByteBuffer等API

AIO

与NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。

即可以理解为,read/write方法都是异步的,完成后会主动调用回调函数。

在JDK1.7中,这部分内容被称作NIO.2,主要在java.nio.channels包下增加了下面四个异步通道:

AsynchronousSocketChannel

AsynchronousServerSocketChannel

AsynchronousFileChannel

AsynchronousDatagramChannel

其中的read/write方法,会返回一个带回调函数的对象,当执行完读取/写入操作后,直接调用回调函数。

实现原理

说道实现原理,还要从操作系统的IO模型上了解

按照《Unix网络编程》的划分,IO模型可以分为:阻塞IO、非阻塞IO、IO复用、信号驱动IO和异步IO,按照POSIX标准来划分只分为两类:同步IO和异步IO。如何区分呢?首先一个IO操作其实分成了 两个步骤:发起IO请求和实际的IO操作,同步IO和异步IO的区别就在于第二个步骤是否阻塞,如果实际的IO读写阻塞请求进程,那么就是同步IO ,因此阻塞IO、非阻塞IO、IO复用、信号驱动IO都是同步IO,如果不阻塞,而是操作系统帮你做完IO操作再将结果返回给你,那么就是异步IO。阻塞IO和非阻塞IO的区别在于第一步,发起IO请求是否会被阻塞,如果阻塞直到完成那么就是传统的阻塞IO,如果不阻塞,那么就是非阻塞IO。( 因此可以大胆猜测理解jdk AIO异步模型是基于api层事件驱动实现+NIO

操作系统io模型

说到操作系统的IO模型,又不得不提select/poll/epoll/iocp

首先我们来定义流的概念,一个流可以是文件,socket,pipe等等可以进行I/O操作的内核对象。

不管是文件,还是套接字,还是管道,我们都可以把他们看作流。

之后我们来讨论I/O的操作,通过read,我们可以从流中读入数据;通过write,我们可以往流写入数据。现在假定一个情形,我们需要从流中读数据,但是流中还没有数据,(典型的例子为,客户端要从socket读如数据,但是服务器还没有把数据传回来),这时候该怎么办?

  • 阻塞。阻塞是个什么概念呢?比如某个时候你在等快递,但是你不知道快递什么时候过来,而且你没有别的事可以干(或者说接下来的事要等快递来了才能做);那么你可以去睡觉了,因为你知道快递把货送来时一定会给你打个电话(假定一定能叫醒你)。

为了了解阻塞是如何进行的,我们来讨论缓冲区,以及内核缓冲区,最终把I/O事件解释清楚。缓冲区的引入是为了减少频繁I/O操作而引起频繁的系统调用(你知道它很慢的),当你操作一个流时,更多的是以缓冲区为单位进行操作,这是相对于用户空间而言。对于内核来说,也需要缓冲区。

假设有一个管道,进程A为管道的写入方,B为管道的读出方。

  1. 假设一开始内核缓冲区是空的,B作为读出方,被阻塞着。然后首先A往管道写入,这时候内核缓冲区由空的状态变到非空状态,内核就会产生一个事件告诉B该醒来了,这个事件姑且称之为"缓冲区非空"。
  2. 但是"缓冲区非空"事件通知B后,B却还没有读出数据;且内核许诺了不能把写入管道中的数据丢掉这个时候,A写入的数据会滞留在内核缓冲区中,如果内核也缓冲区满了,B仍未开始读数据,最终内核缓冲区会被填满,这个时候会产生一个I/O事件,告诉进程A,你该等等(阻塞)了,我们把这个事件定义为"缓冲区满"。
  3. 假设后来B终于开始读数据了,于是内核的缓冲区空了出来,这时候内核会告诉A,内核缓冲区有空位了,你可以从长眠中醒来了,继续写数据了,我们把这个事件叫做"缓冲区非满"
  4. 也许事件Y1已经通知了A,但是A也没有数据写入了,而B继续读出数据,知道内核缓冲区空了。这个时候内核就告诉B,你需要阻塞了!,我们把这个时间定为"缓冲区空"。

这四个情形涵盖了四个I/O事件, 缓冲区满,缓冲区空,缓冲区非空,缓冲区非满 (注都是说的内核缓冲区,且这四个术语都是我生造的,仅为解释其原理而造)。这四个I/O事件是 进行阻塞同步的根本,也是NIO模型被提出使用的原因 。(如果不能理解"同步"是什么概念,请学习操作系统的锁,信号量,条件变量等任务同步方面的相关知识)。

然后我们来说说阻塞I/O的缺点。但是阻塞I/O模式下,一个线程只能处理一个流的I/O事件。如果想要同时处理多个流,要么多进程(fork),要么多线程(pthread_create),很不幸这两种方法效率都不高。

于是再来考虑非阻塞忙轮询的I/O方式,我们发现我们可以同时处理多个流了(把一个流从阻塞模式切换到非阻塞模式再此不予讨论):

1
2
3
4
5
6
while true {
for i in stream[]; {
if i has data
read until unavailable
}
}

我们只要不停的把所有流从头到尾问一遍,又从头开始。这样就可以处理多个流了,但这样的做法显然不好,因为如果所有的流都没有数据,那么只会白白浪费CPU。这里要补充一点,阻塞模式下,内核对于I/O事件的处理是阻塞或者唤醒,而非阻塞模式下则把I/O事件交给其他对象(后文介绍的select以及epoll)处理甚至直接忽略。

为了避免CPU空转,可以引进了一个代理(一开始有一位叫做select的代理,后来又有一位叫做poll的代理,不过两者的本质是一样的)。这个代理比较厉害,可以同时观察许多流的I/O事件,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有I/O事件时,就从阻塞态中醒来,于是我们的程序就会轮询一遍所有的流(于是我们可以把"忙"字去掉了)。代码长这样:

1
2
3
4
5
6
7
while true {
select(streams[])
for i in streams[] {
if i has data
read until unavailable
}
}

于是,如果没有I/O事件产生,我们的程序就会阻塞在select处。但是依然有个问题,我们从select那里仅仅知道了,有I/O事件发生了,但却并不知道是那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。

但是使用select,我们有O(n)的无差别轮询复杂度,同时处理的流越多,每一次无差别轮询时间就越长。再次

说了这么多,终于能好好解释epoll了

epoll可以理解为event poll,不同于忙轮询和无差别轮询,epoll之会把哪个流发生了怎样的I/O事件通知我们。此时我们对这些流的操作都是有意义的。(复杂度降低到了O(k),k为产生I/O事件的流的个数,也有认为O(1)的[更新 1])

在讨论epoll的实现细节之前,先把epoll的相关操作列出[更新 2]:

  • epoll_create 创建一个epoll对象,一般epollfd = epoll_create()
  • epoll_ctl (epoll_add/epoll_del的合体),往epoll对象中增加/删除某一个流的某一个事件

比如

epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//有缓冲区内有数据时epoll_wait返回

epoll_ctl(epollfd, EPOLL_CTL_DEL, socket, EPOLLOUT);//缓冲区可写入时epoll_wait返回

  • epoll_wait(epollfd,…)等待直到注册的事件发生

(注:当对一个非阻塞流的读写发生缓冲区满或缓冲区空,write/read会返回-1,并设置errno=EAGAIN。而epoll只关心缓冲区非满和缓冲区非空事件)。

一个epoll模式的代码大概的样子是:

1
2
3
4
5
6
while true {
active\_stream[] = epoll\_wait(epollfd)
for i in active\_stream[] {
read or write till unavailable
}
}

简单的的画了下自己理解的概念草图:

select/epoll白话文总结:select/epoll进程首先会copy所有对应等待的文件描述符fd(理解为文件指针的指针,select 1024,epoll无限制),当内核发现当前缓冲区满或空时触发一个i/o响应中断,告诉select/epoll进程,对于select来讲只是被告知触发的fd数,需要内核再次遍历下所有fd,然后返回可用fd并copy至用户态,下次发起select时会再次copy,与epoll不同的是,内核在poll时会回调epoll告诉具体可用fd,并存储在epoll维护的一个list里(基于红黑树)然后直接将可用list copy至用户态,并且在下次发起系统调用时,不需要再次进行第一次的fd copy.

可以理解的说明是:在Linux 2.6以后,java NIO的实现,是通过epoll来实现的,这点可以通过jdk的源代码发现。而AIO,在windows上是通过IOCP实现的,在linux上还是通过epoll来实现的,目前除了windows操作系统还不提供这种异步io实现机制。

这里强调一点:AIO,这是I/O处理模式,而epoll等都是实现AIO的一种编程模型;换句话说,AIO是一种接口标准,各家操作系统可以实现也可以不实现。在不同操作系统上在高并发情况下最好都采用操作系统推荐的方式。Linux上还没有真正实现网络方式的AIO。

底层基础 :

AIO实现

在windows上,AIO的实现是通过IOCP来完成的,看JDK的源代码,可以发现

WindowsAsynchronousSocketChannelImpl

看实现接口:

implements Iocp.OverlappedChannel

再看实现方法:里面的read0/write0方法是native方法,调用的jvm底层实现。

在linux上,AIO的实现是通过epoll来完成的,看JDK源码,可以发现,实现源码是:

UnixAsynchronousSocketChannelImpl

看实现接口:

implements Port.PollableChannel

这是与windows最大的区别,poll的实现,在linux2.6后,默认使用epoll。

NIO对应Buffer的选择

对于NIO来说,缓存的使用可以使用DirectByteBuffer和HeapByteBuffer。如果使用了DirectByteBuffer,一般来说可以减少一次系统空间到用户空间的拷贝。但Buffer创建和销毁的成本更高,更不宜维护,通常会用内存池来提高性能。如果数据量比较小的中小应用情况下,可以考虑使用heapBuffer;反之可以用directBuffer。

堆外内存有以下特点:

  • 对于大内存有良好的伸缩性,不受JVM gc控制适合保存长久对象 用户session等
  • 在进程间可以共享,减少虚拟机间的复制
  • 创建开销大,不受JVM管控,内存回收时通过DirectByteBuffer在jvm中引用回收时被回收。

NIO存在的问题

使用NIO != 高性能,当连接数&lt;1000,并发程度不高或者局域网环境下NIO并没有显著的性能优势。

NIO并没有完全屏蔽平台差异,它仍然是基于各个操作系统的I/O系统实现的,差异仍然存在。使用NIO做网络编程构建事件驱动模型并不容易,陷阱重重。

推荐大家使用成熟的NIO框架,如Netty,MINA等。解决了很多NIO的陷阱,并屏蔽了操作系统的差异,有较好的性能和编程模型。

epoll惊群:

惊群主要还是指在多线程共享一个epoll fd的时候,如果epoll_wait所等待的fd(比如一条tcp连接可读)有事件发生时,epoll不知道唤醒哪个线程,就会把所有线程都唤醒,但是最终只有一个线程能去处理,其他的线程都会返回。如果不是多线程共享一个epoll,那就不会有这样的问题。

Ngnix 的解决方法

Ngnix 目前有几种方法解决惊群问题。

accept_mutex 锁

如果开启了accept_mutex锁,每个 worker 都会先去抢自旋锁,只有抢占成功了,才把 socket 加入到 epoll 中,accept 请求,然后释放锁。accept_mutex锁也有负载均衡的作用。

accept_mutex效率低下,特别是在长连接的时候。因为长连接时,一个进程长时间占用accept_mutex锁,使得其它进程得不到 accept 的机会。因此不建议使用,默认是关闭的。

EPOLLEXCLUSIVE 标识

EPOLLEXCLUSIVE是4.5+内核新添加的一个 epoll 的标识,Ngnix 在 1.11.3 之后添加了NGX_EXCLUSIVE_EVENT。

EPOLLEXCLUSIVE标识会保证一个事件发生时候只有一个线程会被唤醒,以避免多侦听下的"惊群"问题。不过任一时候只能有一个工作线程调用 accept,限制了真正并行的吞吐量。

SO_REUSEPORT 选项

SO_REUSEPORT 是惊群最好的解决方法,Ngnix 在 1.9.1 中加入了这个选项,每个 worker 都有自己的 socket,这些 socket 都bind同一个端口。当新请求到来时,内核根据四元组信息进行负载均衡,非常高效。

LT DT:JDK水平触发,Netty边缘触发

netty jdk epoll bug

空轮询情况即:selector.select(timeoutMillis)操作会立即返回,不会阻塞timeoutMillis,导致 currentTimeNanos 几乎不变,这种情况下,会反复执行selector.select(timeoutMillis),变量selectCnt 会逐渐变大,当selectCnt 达到阈值,则执行rebuildSelector方法,进行selector重建,将出现bug的Selector上的channel重新注册到新的Selector上,解决cpu占用100%的bug。

Netty 解决TCP粘包拆包问题

为了解决TCP粘包、拆包导致的半包读写问题,Netty默认提供了多种编解码器用于处理半包,或者可以根据实际情况自行实现ChannelHandler来定制自己的应用协议栈,一般可以直接实现ByteToMessageDecoder。使用时只需要将需要的编解码器添加到channel的责任链上即可,Netty处理粘包拆包问题的核心思路就是:每次读取的时候,如果能读取到一个完整的数据包,才真正读取出来,一直读到没有数据可读,如果有半包消息,则保存下来未处理的半包消息,下次读事件触发的时候,将未处理的半包消息和新的消息内容合并在一起再继续处理。最后将所有解析出来的完整数据包依次进行fireChannelRead事件的传播,进行后续的业务处理。

接下来我会整理Tomcat servlet相关服务器模型、NIO在Netty中实战,及Netty在dubbo、zk中应用。

参考:

https://www.jianshu.com/p/db5da880154a

https://tech.meituan.com/nio.html

OceanBase(中文名"海钡云")是阿里巴巴/蚂蚁金服自主研发的面向云时代的关系数据库,目前,OceanBase已经应用于蚂蚁金服会员、交易、支付、账务、计费等核心系统和网商银行等业务系统,同时也支撑着双11用户每一笔订单背后的数据和事务处理,在阿里电商、金融、云服务领域大放异彩。

    - 可扩展,分布式系统,支持ACID、无缝扩容,兼具分布式系统与关系型数据库的优点。

    - 高可用,具备多可用区部署能力, 可抵御少数可用区失败。

    - 兼容MySQL协议。MySQL用户可以无缝切换。

    - 支持多租户,在数据库进程层面作资源隔离,极高的资源整合度。

    - 高性能,内存型数据库,充分利用SSD等新硬件。

- 诞生

随着淘宝业务的飞速发展,数百亿条记录、数十TB、数万TPS、数十万QPS这样的压力让传统关系数据库不堪重负,单纯的硬件升级以无法使问题得以解决,分表分库也总不是奏效,分库后分布式事务一致性性能低下等一系列问题。。

设计思路

思路1
常见的做法根据业务特点对数据库进行水平拆分,路由取模到不同数据库服务器上,这种思路的弊端:

1、节点动态伸缩很被动,操作复杂,往往需要人工介入;
2、多表关联查询、写入效率低下;
3、目前广泛的关系型数据库存储引擎是针对机械硬盘特点设计,无法发挥SSD(固态硬盘)高性能能力(传统硬盘包 含有一个物理读写头,一次可以跨多个物理盘片读取数据流。如果数据可以顺序读取(大的多媒体应用这种模式比较适合)如果读取数据要搜索盘片的多个扇区,那么传统硬盘读写头的性能会急剧下降。
与此相反,闪存驱动的物理构成就是成百上千个可以随机访问的块,是由分散的许多芯片组成的,读取哪一块的数据不会影响访问性能。随着SSD价格不断降低,容量和性能不断提升,SSD取代磁盘只是个时间问题。)

思路2
参考分布式表格系统做法,例如Google的Bigtable,HBase这种底层基于分布式文件存储架构的设计思路,将大表分为几十万、几百万个子表,子表之间按照主键有序,服务节点的动态伸缩对于调用者是透明的,解决了可扩展性问题,和范围查询问题。弊端是无法支持事务,Bigtable、HBase支持行锁级别保证原子性,不支持跨表跨行事务。如果在此基础上引入基于二阶段提交的分布式事务则会导致性能低下,这种思路在Google的Percolator系统中得到了体现,其平均响应时间2-5s!。

新的设计思路:

写操作放在updateServer单服务上,优先基于内存存储作为增量数据保存,由于一天的写操作不会很多,巧妙的支持了高并发下事务,同时ChunkServer保存全量基线历史数据,MergerServer合并数据,在读操作时,ChunkServer会和UpdateServer上数据合并再返回,同时,夜间低峰期主动和UpdateServer增量数据合并,保证UpdateServer服务高性能数据不会积压太多。

和传统关系数据库比较:

1、高可用方面:


高可用,具备多可用区部署能力, 可抵御少数可用区失败。强一致性同步,不丢失数据,满足了CA,基于分布式选举算法在故障节点出现后自动剔除选出新节点,分3机房部署。(Google Chubby系统的发明者说过一句话,这个世界上所有的高可用强一致的协议都是Paxos或者Paxos的变种)oracle RAC不支持跨机房部署。

2、数据库引擎:

第一类是传统关系数据库引擎。关系数据库本质上是面向磁盘设计的,它把数据分成很多很多的页面,通过页面缓存机制来管理页面,底层的索引是B+树。这种引擎发展得非常成熟,但是写入性能差,原因是关系数据库的写入放大效应。用户数据是按行写入的,但是关系数据库却是按页面管理的,有时只写了一行数据,却不得不把整个页面刷入磁盘。另外一类互联网公司流行的引擎是LSM树。Google里面有一个很有名的开源系统LevelDB,Facebook基于它又开发了一个类似的系统RocksDB。这种引擎采用基线加增量的设计,数据首先以增量的形式写入到内存中,当增量达到一个阀值时统一写到磁盘。这种做法的好处是解决了关系数据库写入放大的问题,但是它的合并代价会比较大,可能出现合并速度赶不上内存写入的情况。

经验&思考

第一点关于高可用。首先,强一致加高可用可能是未来云数据库的标配。我们以前做应用架构和存储选型的时候会谈很多东西,比如数据库异步复制提高性能,或者CAP理论导致一致性和高可用不可兼得,等等。然而,通过OceanBase的实践我们也已经得出一个结论,实现强一致相比实现弱一致性能开销不是那么大,性能的损耗比较低,而且可以获得非常多的好处,这一定是以后的趋势。另外,实现云数据库级别的高可用底层一定要用Paxos协议,回避该协议的实现方案本质上都是耍流氓。目前我们已经从各大互联网公司,包括Google、Amazon以及Alibaba的云数据库实践看到了这一趋势。
第二点关于自动化。以前传统的关系数据库规模往往比较小,只有少数几台机器,有一个DBA运维就够了。然而,在云数据库时代,一定要做规模化运维,一个DBA对几千甚至上万机器,达到这个量级一定要做自动化。强一致是自动化的前提,没有强一致很难自动化,主库故障备库会丢数据,自动化是很难的。虽然每次宕机概率很低,但是规模上来概率就高了。
另外云数据库设计的时候也会有很多不一样的考虑,尽可能地减少人工干预。例如数据库配置项,或者比较流行的SQL Hint,在云数据库时代一定需要尽可能减少。系统设计者需要在服务端自己消化,不要把灵活性留给运维的人员,因为运维的人要做的就是规模化运维。
最后一点关于成本。成本是云的关键,很多用户上云就是为了节省成本。首先,性能不等于成本。性能虽然很重要,但是成本需要考虑更多的因素,性能、压缩比、利用率、规模化运维,等等。以利用率为例,云数据库的一个成本节省利器就是提高机器利用率。在云数据库中,可以通过分布式方案把整个利用率提高来,即使单机性能类似,但是利用率提上来以后,成本会特别的低。

参考:https://mp.weixin.qq.com/s/CXDrXxYV07O2JF8fIulqow


Spring Cloud Eureka是Spring Cloud Netflix微服务套件中的一部分,基于Netflix Eureka做了二次封装,主要负责完成微服务架构中的服务治理功能。出于好奇想知道Eureka可以注册的最大服务实例数是多少,于是有了下面的测试。

Eureka服务注册与发现相关源码介绍

eureka整体架构图(图片来源网络)

可以看到eueka按逻辑上可以划分为3个模块:
eureka-server、eureka-client-service-provider、eureka-client-service-consumer。

  • eureka-server:服务端,提供服务注册和发现。
  • eureka-client-service-provider:客户端,服务提供者,通过http rest告知服务端注册,更新,取消服务。
  • eureka-client-service-consumer:客户端,服务消费者,通过http rest从服务端获取需要服务的地址列表,然后配合一些负载均衡策略(ribbon)来调用服务端服务。

首先, 对于服务注册中心、服务提供者、服务消费者这三个主要元素来说,后者(Eureka客户端)在整个运行机制中是大部分通信行为的主动发起者,而注册中心主要是处理请求的接收者。所以以下会我们会分别基于Eureka客户端、服务端入手看看他们是如何完成这些行为的。

client端:

主要做了如下几件事:

  • 服务注册(Registry)——初始化时执行一次,向服务端注册自己服务实例节点信息包括ip、端口、实例名等,基于POST请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public EurekaHttpResponse Void register(InstanceInfo info) {

String urlPath = &amp;quot;apps/&amp;quot; + info.getAppName();

ClientResponse response = null;

try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();

addExtraHeaders(resourceBuilder);

response = resourceBuilder

.header(&amp;quot;Accept-Encoding&amp;quot;, &amp;quot;gzip&amp;quot;)

.type(MediaType.APPLICATION\_JSON\_TYPE)

.accept(MediaType.APPLICATION\_JSON)

.post(ClientResponse.class, info);

return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {

logger.debug(&amp;quot;Jersey HTTP POST {}/{} with instance {}; statusCode={}&amp;quot;, serviceUrl, urlPath, info.getId(),

response == null ? &amp;quot;N/A&amp;quot; : response.getStatus());

}

if (response != null) {

response.close();

}
}
}
  • 服务续约(renew)——每隔30s向服务端PUT一次,保证当前服务节点状态信息实时更新,不被服务端失效剔除。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public EurekaHttpResponse InstanceInfo sendHeartBeat   (String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
String urlPath = apps/+ appName +/+ id;

ClientResponse response = null;

try {

WebResource webResource = jerseyClient.resource(serviceUrl)

.path(urlPath)

.queryParam(status,info.getStatus().toString())

.queryParam(lastDirtyTimestamp,info.getLastDirtyTimestamp().toString());

if (overriddenStatus != null) {

webResource = webResource.queryParam(&amp;quot;overriddenstatus&amp;quot;, overriddenStatus.name());

}

Builder requestBuilder = webResource.getRequestBuilder();

addExtraHeaders(requestBuilder);

response = requestBuilder.put(ClientResponse.class);

EurekaHttpResponseBuilder&amp;lt;InstanceInfo&amp;gt; eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));

if (response.hasEntity()) {

eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));

}

return eurekaResponseBuilder.build();

} finally {

if (logger.isDebugEnabled()) {

logger.debug(Jersey HTTP PUT {}/{};
statusCode={};, serviceUrl, urlPath, response == null ? : response.getStatus());

}

if (response !=null ) {

response.close();
}
}
}
  • 更新已经注册服务列表(fetchRegistry)——每隔30s从服务端GET一次增量版本信息,然后和本地比较并合并,保证本地能获取到其他节点最新注册信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

private EurekaHttpResponse InstanceInfo getInstanceInternal (String urlPath) {

ClientResponse response = null ;

try {
Builder requestBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();

addExtraHeaders(requestBuilder);

response = requestBuilder.accept(MediaType.APPLICATION\_JSON\_TYPE).get(ClientResponse.class);

InstanceInfo infoFromPeer = null ;

if (response.getStatus() == Status.OK.getStatusCode() &amp;amp;&amp;amp; response.hasEntity()) {

infoFromPeer = response.getEntity(InstanceInfo.class);

}

return anEurekaHttpResponse(response.getStatus(), InstanceInfo.class)

.headers(headersOf(response))

.entity(infoFromPeer)

.build();
} finally{

if(logger.isDebugEnabled()) {
logger.debug(&amp;quot;Jersey HTTP GET {}/{}; statusCode={}&amp;quot;, serviceUrl, urlPath, response ==null? &amp;quot;N/A&amp;quot; : response.getStatus());

}

if (response != null ) {
response.close();
}
}
}
  • 服务下线(cancel)——在服务shutdown的时候,需要及时通知服务端把自己剔除,以避免客户端调用已经下线的服务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public  EurekaHttpResponse Void cancel   (String appName, String id) {

String urlPath = apps/ + appName +/ + id;

ClientResponse response = null;

try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();

addExtraHeaders(resourceBuilder);

response = resourceBuilder.delete(ClientResponse.class);

return
anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
}
finally{
if (logger.isDebugEnabled()) {

logger.debug(&amp;quot;Jersey HTTP DELETE {}/{}; statusCode={}&amp;quot;, serviceUrl, urlPath, response == \*\*null\*\*? &amp;quot;N/A&amp;quot; : response.getStatus());

}

if (response != null ) {
response.close();
}
}
}
Server端:

我们知道eureka client是通过Jersey Client基于Http协议与eureka server交互来注册服务、续约服务、取消服务、服务查询等。同时,Server端还会维护一份服务实例清单,并每隔90s对未续约的实例进行失效剔除。所以,eureka server肯定要提供上述http的服务端的Jersey Server实现,由于此次测试针对客户端模拟,服务端对应接口就先不在这描述了。

测试过程

  • 测试工具:
工具选项 描述 配置/能力
测试机器 CentOS release 5.4 (Final) 3台 8c16g,open files:65535,max user processes:65535
Eureka服务端 单机部署,boot版本(Dalston.SR5) -XX:MaxHeapSize=4g(默认)
Wireshark Windows平台抓包工具 抓取HTTP、TCP等报文内容、协议相关信息
UAV监控 公司自研监控平台 实时监控采集应用性能指标

-
测试方案:
1、先启动多组Eureka客户端并用Wireshark抓取其真实请求,然后结合Eureka源码分析其调用逻辑关系(基于TCP短链接交互)。
2、根据源码,将客户端http调用方式进行池化,即每笔实例注册流程:(注册、获取实例、续约)等调用请求统一从连接池获取连接,获取实例过程为每次获取delta(增量)。
3、每笔流程完成后sleep0.5s,保证所有节点续约、获取实例(间隔30s)的频率对服务端负载均匀。
4、串行模拟整个流程,每完成500笔,整体观察5分钟记录服务端cpu、内存、线程、连接数等信息。直到服务端或客户端出现大量异常(超时、失效剔除等可能异常)则认为到达eureka注册瓶颈。
5、更改服务端servlet容器配置,尝试进行优化(最大连接数、线程数等)从新开始流程测试直到最优。

模拟测试流程图

  • 数据记录

可以看出到实例注册数到7000多时候,连接数不稳定飙到10000左右,同时此时客户端开始大量报错超时,服务端开始拒绝连接:

此时连接数截图详细,可以清楚看到conns达到10000阀值后接着下降:

jstack-F 后显示大量tomcat workThreadPoolExecutor线程block在Socket上:

此时结果和预期猜测一样。MaxConnection=10000,且大于AcceptCount=100时,Tomcat会触发拒绝连接。

而我们使用的spring boot版本使用内嵌Tomcat版本8.5 接着改了服务端tomcat配置改成如下配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override

public void customize (Connector connector) {
Http11NioProtocol protocol = (Http11NioProtocol) connector.getProtocolHandler();

//设置最大连接数
protocol.setMaxConnections(20000);// 10000

// 设置最大线程数

protocol.setMaxThreads(5000);// 200

// protocol.setConnectionTimeout(1); // 20s

protocol.setAcceptCount(1000); // 100

// protocol.setKeepAliveTimeout(1);

System.out.println(protocol.get() + protocol.getMaxConnections());

}

再次从新开始一轮测试,数据记录如下:

实例数 7100 8000
cpu 749% 790%
mem 17.7 18%
conn 12007 13690
Threads 1982 1997

可以看出,在修改了tomcat对应配置,将最大连接数调至20000,线程数调至5000后,Eureka可注册的实例数突破了7000,连接数也突破了10000,实例数注册到8000后才开始报错,看出此时cpu已经接近满负载,操作系统本身调度已经压到极限,于是结束了本次测试。

注意:有一种误解,就是我们常说一台机器有65536个端口,那么承载的连接数就是65536个,这个说法是极其错误的,这就混淆了源端口和访问目标端口。系统是通过一个四元组来标识一个TCP连接,(src_ip,src_port,dst_ip,dst_port)即源IP、源端口、目标IP、目标端口。比如我们有一台服务192.168.0.1,开启端口80.那么所有的客户端都会连接到这台服务的80端口上面,我们做压测的时候,利用压测客户端,这个客户端的连接数是受到端口数的限制,但是服务器上面的连接数可以达到成千上万个,一般可以达到百万(4C8G配置),至于上限是多少,需要看其自身优化的程度(可通过操作系统的文件句柄数量限制、TCP参数进行调优),但是参数值并不是设置的越大越好,有的需要考虑服务器的硬件配置,参数对服务器上其它服务的影响等。

结论: Eureka Server服务实例注册量的负载值和操作系统、应用容器本身对应的配置相关,调整操作系统可打开最大文件句柄、进程数,调整应用容器相关最大连接数、线程数、NIO服务器模型引入等等手段都可提高我们应用服务整体吞吐量。

参考资料:
https://www.jianshu.com/p/5ffb71b4c13d
https://tomcat.apache.org/tomcat-8.0-doc/config/http.html
http://yeming.me/2016/12/01/eureka1/