多个进程绑定(bind)同一个端口,当客户断发起连接(connect)时,内核会通过一个hash算法决定分配到那个进程上。
以下是未优化前的Linux 4.3内核的实现,可见是多么地不直观。它采用了遍历HASH冲突链表的方式进行reuseport套接字的精确定位:
result = NULL;
badness = 0;
udp_portaddr_for_each_entry_rcu(sk, node, &hslot2->head) {
score = compute_score2(sk, net, saddr, sport,
daddr, hnum, dif);
if (score > badness) { // 冒泡排序
// 找到了更加合适的socket,需要重新hash
result = sk;
badness = score;
reuseport = sk->sk_reuseport;
if (reuseport) {
hash = udp_ehashfn(net, daddr, hnum,
saddr, sport);
matches = 1;
}
} else if (score == badness && reuseport) { // reuseport套接字散列定位
// 找到了同样reuseport的socket,进行定位
matches++;
if (reciprocal_scale(hash, matches) == 0)
result = sk;
hash = next_pseudo_random32(hash);
}
}
我们来看看在4.5和4.6内核中对于reuseport的查找增加了一些什么神奇的新东西:
result = NULL;
badness = 0;
udp_portaddr_for_each_entry_rcu(sk, node, &hslot2->head) {
score = compute_score2(sk, net, saddr, sport,
daddr, hnum, dif);
if (score > badness) {
// 在reuseport情形下,意味着找到了更加合适的socket组,需要重新hash
result = sk;
badness = score;
reuseport = sk->sk_reuseport;
if (reuseport) {
hash = udp_ehashfn(net, daddr, hnum,
saddr, sport);
if (select_ok) {
struct sock *sk2;
// 找到了一个组,接着进行组内hash。
sk2 = reuseport_select_sock(sk, hash, skb,
sizeof(struct udphdr));
if (sk2) {
result = sk2;
select_ok = false;
goto found;
}
}
matches = 1;
}
} else if (score == badness && reuseport) {
// 这个else if分支的期待是,在分层查找不适用的时候,寻找更加匹配的reuseport组,注意4.5/4.6以后直接寻找的是一个reuseport组。
// 在某种意义上,这回退到了4.5之前的算法。
matches++;
if (reciprocal_scale(hash, matches) == 0)
result = sk;
hash = next_pseudo_random32(hash);
}
}
struct sock *reuseport_select_sock(struct sock *sk,
u32 hash,
struct sk_buff *skb,
int hdr_len)
{
...
prog = rcu_dereference(reuse->prog);
socks = READ_ONCE(reuse->num_socks);
if (likely(socks)) {
/* paired with smp_wmb() in reuseport_add_sock() */
smp_rmb();
if (prog && skb) // 可以用BPF来从用户态注入自己的定位逻辑,更好实现基于策略的负载均衡
sk2 = run_bpf(reuse, socks, prog, skb, hdr_len);
else
// reciprocal_scale简单地将结果限制在了[0,socks)这个区间内
sk2 = reuse->socks[reciprocal_scale(hash, socks)];
}
...
}
单机上的 连接服务器 则可以用端口复用的方式实现负载均衡;也完美解决了nginx之前的惊群现象,也不需要像nginx后来的做法去避免惊群。
下面给出测试用的demo
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#define MAXLINE 100
void* thread_(void* agr)
{
int listenfd,connfd;
struct sockaddr_in servaddr;
char buff[MAXLINE+1];
time_t ticks;
unsigned short port;
int flag=1,len=sizeof(int);
port=10013;
if( (listenfd=socket(AF_INET,SOCK_STREAM,0)) == -1)
{
perror("socket");
exit(1);
}
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family=AF_INET;
servaddr.sin_addr.s_addr=htonl(INADDR_ANY);
servaddr.sin_port=htons(port);
//SO_REUSEPORT
if( setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, len) == -1)
{
perror("SO_REUSEADDR");
exit(1);
}
if( setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, &flag, len) == -1)
{
perror("SO_REUSEPORT");
exit(1);
}
if( bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) ==-1)
{
perror("bind");
exit(1);
}
else
printf("bind call OK!\n");
if( listen(listenfd,5) == -1)
{
perror("listen");
exit(1);
}
char buf[] = "hello world.";
for(;;) {
if( (connfd=accept(listenfd,(struct sockaddr*)NULL,NULL)) == -1)
{
perror("accept");
exit(1);
}
send(connfd,buf,sizeof(buf),0);
close(connfd);
printf("pid %d : once\n",pthread_self());
}
}
int main(int argc, char** argv)
{
pthread_t pt;
if( 0!=pthread_create(&pt,NULL,thread_,(void*)0) )
{
perror("pthread_create");
}
thread_((void*)1);
return 0;
}
原文:http://www.cnblogs.com/jcj-520/p/7001102.html