先上群友的问题,然后再做分析。
最近在测试 低延迟 rust tokio 组播,用 tokio 和 socket2 ,
设备是 aws c7gn.metal arm 架构 64 核心。 1 个 sender ,n 个 receiver 。发送时间戳。 设置 linux route 走 lo 网卡。
当 receiver20 个以内时。延迟大概是 40us 以内可控。 当 receiver 个数增加到 50-100 个以后, 延迟逐渐上升到 100~200us 以上。cpu 仍然占用很低,延迟显著增加,这是为什么呢?
求解 如何能让 50+ receiver 同时接收, 还能保持 50us 以下延迟?
use serde::{Deserialize, Serialize};
use std::net::{Ipv4Addr, SocketAddrV4};
const MULTICAST_ADDR: Ipv4Addr = Ipv4Addr::new(239, 255, 0, 1);
const MULTICAST_PORT: u16 = 3001;
const BIND_ADDR: Ipv4Addr = Ipv4Addr::LOCALHOST;
pub fn timestamp16() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros()
}
/// Networking options.
#[derive(argh::FromArgs)]
struct Args {
/// multicast address that the socket must join
#[argh(option, short = 'a', default = "MULTICAST_ADDR")]
addr: Ipv4Addr,
/// specific port to bind the socket to
#[argh(option, short = 'p', default = "MULTICAST_PORT")]
port: u16,
/// whether or not to allow the UDP socket
/// to be reused by another application
#[argh(switch)]
is_sender: bool,
}
fn main() -> std::io::Result<()> {
init_logger();
use socket2::{Domain, Protocol, Socket, Type};
let Args {
addr,
port,
is_sender,
} = argh::from_env();
println!("{} {} is_sender: {}", addr, port, is_sender);
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
socket.set_nonblocking(true)?;
socket.set_reuse_address(true)?;
socket.set_reuse_port(true)?;
socket.set_multicast_loop_v4(true)?;
socket.set_multicast_ttl_v4(1)?;
socket.join_multicast_v4(&addr, &Ipv4Addr::LOCALHOST)?;
let fin_addr = SocketAddrV4::new(addr, port);
if is_sender {
socket.bind(&SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into())?;
} else {
socket.bind(&SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port).into())?;
}
let runtime = tokio::runtime::Builder::new_current_thread()
.thread_name("network")
.enable_all()
.build()?;
let udp = {
let _guard = runtime.enter();
tokio::net::UdpSocket::from_std(socket.into())?
};
runtime.block_on(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(1000));
if is_sender == false {
interval = tokio::time::interval(tokio::time::Duration::from_secs(60 * 60 * 24));
}
interval.tick().await;
let mut buf = [0; 16];
loop {
tokio::select! {
recv_res = udp.recv_from(&mut buf) => {
let (count, remote_addr) = recv_res.expect("cannot receive from socket");
let parsed = u128::from_be_bytes(buf[..count].try_into().unwrap());
let cost = timestamp16() - parsed;
println!("{:?}", );!("recv {remote_addr} {parsed} {count} {cost}")
}
_ = interval.tick() => {
let cur = timestamp16();
let input = cur.to_be_bytes();
udp.send_to(&input, fin_addr).await.expect("cannot send message to socket");
println!("{:?}", );!("send: {}", cur);
}
}
}
});
Ok(())
}
乍一看,直观的感受应该是如果receiver变多,延迟不应该线性增长。那么问题在哪里?
问题不在于 CPU,也不在指定核绑定。在于send_to的耗时。tokio的send_to其实是syscall调用send_to,而 linux send_to在没有listener的耗时在2us,如果是 10个listener,则sendto是 2*10 + 5= 25us。For example, let’s say with no listeners the base time for our sendto() call is 5 microseconds. Each additional listener increases the time of the sendto() call by about 2 microseconds. So if we have 10 listeners, now the sendto() call takes 2*10+5 = 25 microseconds. 每个listenter的分发会建立单独的skb buffer等一系列过程,也就是会耗时 2us。这么计算下来,100 个listener耗时大约100*2 + 5= 205us,和上面问题的统计结果是一致的。
linux的多播mulitcast性能是很差的,kernel会复制消息给每个receiver。那么如何通信呢?IPC 机制就是很好的方法,比如shm、unix 套接字。共享内存要比unix套接字延迟更低,busy wait机制相比select机制延迟更低,但是会占用更多 CPU,根据tradeoff来选择吧。
there’s a caveat to shared memory: at least in our experience, you don’t get much of a latency improvement if you still rely on the kernel to wake-up the receiving thread. (E.g. if the reader is based around select()). The least-latency approach is a busy-wait scheme, but you burn up a whole CPU. If you have more threads than CPUs, then you get into a difficult “optimization with trade-offs” problem。
参考:https://stackoverflow.com/questions/6866611/linux-multicast-sendto-performance-degrades-with-local-listeners