brpc internal
文章目录
brpc 内部实现
brpc源码级分享
thread model
pthread 1:1atomic cache同步降低性能
fiber n:1 -> nginx 多核难以扩展, 用户不能做阻塞操作.
context 存储contextualStack
bthread_make_fcontext (boost::context)手动切换线程上下文->函数栈, 寄存器
bthread_jump_fcontext 将context解包, 恢复上下文
防止栈溢出: 1. mprotect to add a guarding page (并发bthread 过多会导致mmap失败)
2.use mmap to alloc page-aligned memory
schedule 每一个pthread有一个taskgroup
basic-> run_queue(FIFO)
remotetaskqueue->备份二级队列, 向队列中提交不在btrhead中创建的任务
taskgroup逻辑
1 2 3 4 |
while not stop wait until signaled work stealing from other sched_to(that_bthread) |
work stealing
futex_wait_private(value, expect) #atomic 系统调用原语 Parkinglot封装, wait signal
避免全局竞争的方法:
很多个parkinglot, 进行worker分组, 改成局部竞争, 只唤醒组内部分worker
- workstealingqueue: 从这个队列里偷
TaskControl
- 单例管理所有的taskgroups
- 如果bthread在non-worker的pthreads中创建的时候, task_control选择一个taskgroup, 把这个btrhead写到remoteTaskQueue中
- signal-task 唤醒通知一部分worker来偷
- steal-task 从所有taskgroup中偷, 避免饿死
bthread执行优先级
local queue->remote queue->other worker local queue->other worker remote taskqueue
bthread_t
32bit 版本号(防止aba问题) + 32bit slot id(resourcepool中的下标)->taskMeta bthread的管理结构
start_foreground ->set_mained(ready_to_run + sched_to) 直接跑新的, 正在跑的放队尾
start_background ->ready_to_run
阻塞操作
yield -> 把当前运行环境空出来, 加到queue尾
usleep()->把当前任务从runqueue pop 定时加到remote_queue
bthread_id
- client端, 标记每个requestid用来区分response
- 保证rpc context 线程安全
- 用来cancel rpc
- 防止ABA问题
具体来说,bthread_id解决的问题有:
- 在发送RPC过程中response回来了,处理response的代码和发送代码产生竞争。
设置timer后很快触发了,超时处理代码和发送代码产生竞争。
重试产生的多个response同时回来产生的竞争。
通过correlation_id在O(1)时间内找到对应的RPC上下文,而无需建立从correlation_id到RPC上下文的全局哈希表。
取消RPC。
运行
- eventDispatcher:run EPOLLIN epoll bthread
- 当前线程起一个bthread, read/cut messages 这样没有专门的io线程, 避免全局竞争, 如果某个callback阻塞, 其他线程会吧read/cut 偷走继续运行
- callback 业务函数
对于每个fd, 最多有一个I和O bthread
注意:
在brpc中加mutex锁, brpc请求下游模块, 没有worker资源处理下游返回的read..无法释放mutex锁
buffer management->IOBuf
IOBuf->BlockRef->Block 三层结构
非连续的存储, 每个block引用计数, 避免频繁new, 使用cache in TLSData
timer keeping
定时器分桶, 每个结构要存的: task_list, mutex, _nearest_run_time 无锁无竞争
拿到所有定时器时, _nearset_run_time建一个全局堆
1 2 3 4 5 6 7 |
while not stop check each bucket for new tasks build a heap (remove deleted tasks) pop first task from heap run if timeout cal the next timeout futex_wait_private(如果有更紧急的timer插入, 会唤醒timer线程) |
Memory management 专用分配, 固定size
BlockGroup / Block / T 类似于buddy 算法
1
|
resourceid = group_idex + block_offset + slot_offset |
I/O model
naming & load balancer
- control reverse 观察者模式, 服务器变化后,由naming调用lb中 add, remove, reset三个接口
- DoublyBufferedData, TLS lock每次读前台时, 后台切换时, 从每个线程lock一个锁, 读完之后切换
Bvar & sampling
每个写bvar的时候, 只写tls . 读的时候combine.
文章作者 Sun.StriKE
上次更新 2019-03-27 (417d97f)