blob: f64195a3c092de1f1bae4b5ea4e3fd414af8be04 [file] [log] [blame]
/* SPDX-License-Identifier: MIT */
/*
* Sample program that can act either as a packet sink, where it just receives
* packets and doesn't do anything with them, or it can act as a proxy where it
* receives packets and then sends them to a new destination.
*
* Examples:
*
* Act as a proxy, listening on port 4444, and send data to 192.168.2.6 on port
* 4445. Use multishot receive, DEFER_TASKRUN, and fixed files
*
* ./proxy -m1 -d1 -f1 -r4444 -H 192.168.2.6 -p4445
*
* Act as a bi-directional proxy, listening on port 8888, and send data back
* and forth between host and 192.168.2.6 on port 22. Use multishot receive,
* DEFER_TASKRUN, fixed files, and buffers of size 1500.
*
* ./proxy -m1 -d1 -f1 -B1 -b1500 -r8888 -H 192.168.2.6 -p22
*
* Act a sink, listening on port 4445, using multishot receive, DEFER_TASKRUN,
* and fixed files:
*
* ./proxy -m1 -d1 -s1 -f1 -r4445
*
* Run with -h to see a list of options, and their defaults.
*
* (C) 2024 Jens Axboe <axboe@kernel.dk>
*
*/
#include <fcntl.h>
#include <stdint.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <unistd.h>
#include <liburing.h>
#include "proxy.h"
#include "list.h"
static int start_bgid = 1;
static int nr_conns;
static int open_conns;
static long page_size;
static unsigned long event_loops;
static unsigned long events;
static int mshot = 1;
static int sqpoll;
static int defer_tw = 1;
static int is_sink;
static int fixed_files = 1;
static char *host = "192.168.2.6";
static int send_port = 4445;
static int receive_port = 4444;
static int buf_size = 32;
static int bidi;
static int ipv6;
static int napi;
static int napi_timeout;
static int wait_batch = 1;
static int wait_usec = 1000000;
static int verbose;
static int nr_bufs = 256;
static int br_mask;
struct pending_send {
struct list_head list;
int fd, bgid, bid, len;
void *data;
};
/*
* Per socket stats per connection. For bi-directional, we'll have both
* sends and receives on each socket, this helps track them seperately.
* For sink or one directional, each of the two stats will be only sends
* or receives, not both.
*/
struct conn_dir {
int pending_shutdown;
int pending_sends;
struct list_head send_list;
int rcv, rcv_shrt;
int snd, snd_shrt;
int snd_busy;
unsigned long in_bytes, out_bytes;
int bgid_switch;
int mshot_resubmit;
};
enum {
CONN_F_DISCONNECTING = 1,
CONN_F_DISCONNECTED = 2,
CONN_F_PENDING_SHUTDOWN = 4,
CONN_F_STATS_SHOWN = 8,
CONN_F_END_TIME = 16,
};
#define NR_BUF_RINGS 2
struct conn_buf_ring {
struct io_uring_buf_ring *br;
void *buf;
int bgid;
};
struct conn {
struct conn_buf_ring brs[NR_BUF_RINGS];
struct conn_buf_ring *cur_br;
int tid;
int in_fd, out_fd;
int start_bgid;
int cur_br_index;
int pending_cancels;
int flags;
struct conn_dir cd[2];
struct timeval start_time, end_time;
union {
struct sockaddr_in addr;
struct sockaddr_in6 addr6;
};
};
#define MAX_CONNS 1024
static struct conn conns[MAX_CONNS];
#define vlog(str, ...) do { \
if (verbose) \
printf(str, ##__VA_ARGS__); \
} while (0)
static struct conn *cqe_to_conn(struct io_uring_cqe *cqe)
{
struct userdata ud = { .val = cqe->user_data };
return &conns[ud.op_tid & TID_MASK];
}
static struct conn_dir *fd_to_conn_dir(struct conn *c, int fd)
{
return &c->cd[fd != c->in_fd];
}
/*
* Goes from accept new connection -> create socket, connect to end
* point, prepare recv, on receive do send (unless sink). If either ends
* disconnects, we transition to shutdown and then close.
*/
enum {
__ACCEPT = 1,
__SOCK = 2,
__CONNECT = 3,
__RECV = 4,
__SEND = 5,
__SHUTDOWN = 6,
__CANCEL = 7,
__CLOSE = 8,
};
struct error_handler {
const char *name;
int (*error_fn)(struct error_handler *, struct io_uring *, struct io_uring_cqe *);
};
static int recv_error(struct error_handler *err, struct io_uring *ring,
struct io_uring_cqe *cqe);
static int default_error(struct error_handler *err, struct io_uring *ring,
struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
fprintf(stderr, "%d: %s error %s\n", c->tid, err->name, strerror(-cqe->res));
fprintf(stderr, "fd=%d, bgid=%d, bid=%d\n", cqe_to_fd(cqe),
cqe_to_bgid(cqe), cqe_to_bid(cqe));
return 1;
}
/*
* Move error handling out of the normal handling path, cleanly seperating
* them. If an opcode doesn't need any error handling, set it to NULL. If
* it wants to stop the connection at that point and not do anything else,
* then the default handler can be used. Only receive has proper error
* handling, as we can get -ENOBUFS which is not a fatal condition. It just
* means we need to replenish buffers / switch buffer group IDs.
*/
static struct error_handler error_handlers[] = {
{ .name = "NULL", .error_fn = NULL, },
{ .name = "ACCEPT", .error_fn = default_error, },
{ .name = "SOCK", .error_fn = default_error, },
{ .name = "CONNECT", .error_fn = default_error, },
{ .name = "RECV", .error_fn = recv_error, },
{ .name = "SEND", .error_fn = default_error, },
{ .name = "SHUTDOWN", .error_fn = NULL, },
{ .name = "CANCEL", .error_fn = NULL, },
{ .name = "CLOSE", .error_fn = NULL, },
};
static int setup_listening_socket(int port)
{
struct sockaddr_in srv_addr = { };
struct sockaddr_in6 srv_addr6 = { };
int fd, enable, ret, domain;
if (ipv6)
domain = AF_INET6;
else
domain = AF_INET;
fd = socket(domain, SOCK_STREAM, 0);
if (fd == -1) {
perror("socket()");
return -1;
}
enable = 1;
ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
if (ret < 0) {
perror("setsockopt(SO_REUSEADDR)");
return -1;
}
if (ipv6) {
srv_addr6.sin6_family = AF_INET6;
srv_addr6.sin6_port = htons(port);
srv_addr6.sin6_addr = in6addr_any;
ret = bind(fd, (const struct sockaddr *)&srv_addr6, sizeof(srv_addr6));
} else {
srv_addr.sin_family = AF_INET;
srv_addr.sin_port = htons(port);
srv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
ret = bind(fd, (const struct sockaddr *)&srv_addr, sizeof(srv_addr));
}
if (ret < 0) {
perror("bind()");
return -1;
}
if (listen(fd, 1024) < 0) {
perror("listen()");
return -1;
}
return fd;
}
/*
* Setup 2 ring provided buffer rings for each connection. If we get -ENOBUFS
* on receive, we'll switch to the other ring and re-arm. If this happens
* frequently (see switch= stat), then the ring sizes are likely too small.
* Use -nXX to make them bigger.
*
* The alternative here would be to use the older style provided buffers,
* where you simply setup a buffer group and use SQEs with
* io_urign_prep_provide_buffers() to add to the pool. But that approach is
* slower and has been deprecated by using the faster ring provided buffers.
*/
static int setup_buffer_ring(struct io_uring *ring, struct conn *c, int index)
{
struct conn_buf_ring *cbr = &c->brs[index];
int ret, i;
void *ptr;
cbr->bgid = c->start_bgid + index;
if (posix_memalign(&cbr->buf, page_size, buf_size * nr_bufs)) {
perror("posix memalign");
return 1;
}
cbr->br = io_uring_setup_buf_ring(ring, nr_bufs, cbr->bgid, 0, &ret);
if (!cbr->br) {
fprintf(stderr, "Buffer ring register failed %d\n", ret);
return 1;
}
ptr = cbr->buf;
for (i = 0; i < nr_bufs; i++) {
io_uring_buf_ring_add(cbr->br, ptr, buf_size, i, br_mask, i);
ptr += buf_size;
}
io_uring_buf_ring_advance(cbr->br, nr_bufs);
printf("%d: buffer ring bgid %d, bufs %d\n", c->tid, cbr->bgid, nr_bufs);
return 0;
}
/*
* Sets up two buffer rings per connection, and we alternate between them if we
* hit -ENOBUFS on a receive. See handle_enobufs().
*/
static int setup_buffer_rings(struct io_uring *ring, struct conn *c)
{
int i;
c->start_bgid = start_bgid;
for (i = 0; i < NR_BUF_RINGS; i++) {
if (setup_buffer_ring(ring, c, i))
return 1;
}
c->cur_br = &c->brs[0];
c->cur_br_index = 0;
start_bgid += 2;
return 0;
}
static void free_buffer_rings(struct io_uring *ring, struct conn *c)
{
int i;
for (i = 0; i < NR_BUF_RINGS; i++) {
struct conn_buf_ring *cbr = &c->brs[i];
io_uring_free_buf_ring(ring, cbr->br, nr_bufs, cbr->bgid);
free(cbr->buf);
}
c->cur_br = NULL;
}
static void __show_stats(struct conn *c)
{
unsigned long msec, qps;
struct conn_dir *cd;
int i;
if (c->flags & CONN_F_STATS_SHOWN)
return;
if (!(c->flags & CONN_F_END_TIME))
gettimeofday(&c->end_time, NULL);
msec = (c->end_time.tv_sec - c->start_time.tv_sec) * 1000;
msec += (c->end_time.tv_usec - c->start_time.tv_usec) / 1000;
qps = 0;
for (i = 0; i < 2; i++)
qps += c->cd[i].rcv + c->cd[i].snd;
if (!qps)
return;
if (msec)
qps = (qps * 1000) / msec;
printf("Conn %d/(in_fd=%d, out_fd=%d): qps=%lu, msec=%lu\n", c->tid,
c->in_fd, c->out_fd, qps, msec);
for (i = 0; i < 2; i++) {
cd = &c->cd[i];
if (!cd->in_bytes && !cd->out_bytes)
continue;
printf("\t%3d: rcv=%u (short=%u), snd=%u (short=%u, busy=%u)\n",
i, cd->rcv, cd->rcv_shrt, cd->snd, cd->snd_shrt,
cd->snd_busy);
printf("\t : switch=%u, mshot_resubmit=%d\n",
cd->bgid_switch, cd->mshot_resubmit);
printf("\t : in_bytes=%lu (Kb %lu), out_bytes=%lu (Kb %lu)\n",
cd->in_bytes, cd->in_bytes >> 10,
cd->out_bytes, cd->out_bytes >> 10);
}
c->flags |= CONN_F_STATS_SHOWN;
}
static void show_stats(void)
{
float events_per_loop = 0.0;
static int stats_shown;
int i;
if (stats_shown)
return;
if (events)
events_per_loop = (float) events / (float) event_loops;
printf("Event loops: %lu, events %lu, events per loop %.2f\n", event_loops,
events, events_per_loop);
for (i = 0; i < MAX_CONNS; i++) {
struct conn *c = &conns[i];
__show_stats(c);
}
stats_shown = 1;
}
static void sig_int(int __attribute__((__unused__)) sig)
{
printf("\n");
show_stats();
exit(1);
}
/*
* Special cased for SQPOLL only, as we don't control when SQEs are consumed if
* that is used. Hence we may need to wait for the SQPOLL thread to keep up
* until we can get a new SQE. All other cases will break immediately, with a
* fresh SQE.
*
* If we grossly undersized our SQ ring, getting a NULL sqe can happen even
* for the !SQPOLL case if we're handling a lot of CQEs in our event loop
* and multishot isn't used. We can do io_uring_submit() to flush what we
* have here. Only caveat here is that if linked requests are used, SQEs
* would need to be allocated upfront as a link chain is only valid within
* a single submission cycle.
*/
static struct io_uring_sqe *get_sqe(struct io_uring *ring)
{
struct io_uring_sqe *sqe;
do {
sqe = io_uring_get_sqe(ring);
if (sqe)
break;
if (!sqpoll)
io_uring_submit(ring);
else
io_uring_sqring_wait(ring);
} while (1);
return sqe;
}
static void encode_userdata(struct io_uring_sqe *sqe, struct conn *c, int op,
int bgid, int bid, int fd)
{
__encode_userdata(sqe, c->tid, op, bgid, bid, fd);
}
/*
* Given a bgid/bid, return the buffer associated with it.
*/
static void *get_buf(struct conn *c, int bgid, int bid)
{
struct conn_buf_ring *cbr = &c->brs[bgid - c->start_bgid];
return cbr->buf + bid * buf_size;
}
static void __submit_receive(struct io_uring *ring, struct conn *c, int fd)
{
struct conn_buf_ring *cbr = c->cur_br;
struct io_uring_sqe *sqe;
vlog("%d: submit receive fd=%d\n", c->tid, fd);
/*
* For both recv and multishot receive, we use the ring provided
* buffers. These are handed to the application ahead of time, and
* are consumed when a receive triggers. Note that the address and
* length of the receive are set to NULL/0, and we assign the
* sqe->buf_group to tell the kernel which buffer group ID to pick
* a buffer from. Finally, IOSQE_BUFFER_SELECT is set to tell the
* kernel that we want a buffer picked for this request, we are not
* passing one in with the request.
*/
sqe = get_sqe(ring);
if (mshot)
io_uring_prep_recv_multishot(sqe, fd, NULL, 0, 0);
else
io_uring_prep_recv(sqe, fd, NULL, 0, 0);
encode_userdata(sqe, c, __RECV, cbr->bgid, 0, fd);
sqe->buf_group = cbr->bgid;
sqe->flags |= IOSQE_BUFFER_SELECT;
if (fixed_files)
sqe->flags |= IOSQE_FIXED_FILE;
}
/*
* One directional just arms receive on our in_fd
*/
static void submit_receive(struct io_uring *ring, struct conn *c)
{
__submit_receive(ring, c, c->in_fd);
}
/*
* Bi-directional arms receive on both in and out fd
*/
static void submit_bidi_receive(struct io_uring *ring, struct conn *c)
{
__submit_receive(ring, c, c->in_fd);
__submit_receive(ring, c, c->out_fd);
}
/*
* We hit -ENOBUFS, which means that we ran out of buffers in our current
* provided buffer group. This can happen if there's an imbalance between the
* receives coming in and the sends being processed. Switch to the other buffer
* group and continue from there, previous sends should come in and replenish the
* previous one by the time we potentially hit -ENOBUFS again.
*/
static void handle_enobufs(struct io_uring *ring, struct conn *c,
struct conn_dir *cd, int fd)
{
cd->bgid_switch++;
c->cur_br_index ^= 1;
c->cur_br = &c->brs[c->cur_br_index];
vlog("%d: enobufs: switch to bgid %d\n", c->tid, c->cur_br->bgid);
__submit_receive(ring, c, fd);
}
/*
* Kill this socket - submit a shutdown and link a close to it. We don't
* care about shutdown status, so mark it as not needing to post a CQE unless
* it fails.
*/
static void queue_shutdown_close(struct io_uring *ring, struct conn *c, int fd)
{
struct io_uring_sqe *sqe1, *sqe2;
/*
* On the off chance that we run out of SQEs after the first one,
* grab two upfront. This it to prevent our link not working if
* get_sqe() ends up doing submissions to free up an SQE, as links
* are not valid across separate submissions.
*/
sqe1 = get_sqe(ring);
sqe2 = get_sqe(ring);
io_uring_prep_shutdown(sqe1, fd, SHUT_RDWR);
if (fixed_files)
sqe1->flags |= IOSQE_FIXED_FILE;
sqe1->flags |= IOSQE_IO_LINK | IOSQE_CQE_SKIP_SUCCESS;
encode_userdata(sqe1, c, __SHUTDOWN, 0, 0, fd);
if (fixed_files)
io_uring_prep_close_direct(sqe2, fd);
else
io_uring_prep_close(sqe2, fd);
encode_userdata(sqe2, c, __CLOSE, 0, 0, fd);
}
static void queue_cancel(struct io_uring *ring, struct conn *c)
{
struct io_uring_sqe *sqe;
int flags = 0;
if (fixed_files)
flags |= IORING_ASYNC_CANCEL_FD_FIXED;
sqe = get_sqe(ring);
io_uring_prep_cancel_fd(sqe, c->in_fd, flags);
encode_userdata(sqe, c, __CANCEL, 0, 0, c->in_fd);
c->pending_cancels++;
if (c->out_fd != -1) {
sqe = get_sqe(ring);
io_uring_prep_cancel_fd(sqe, c->in_fd, flags);
encode_userdata(sqe, c, __CANCEL, 0, 0, c->in_fd);
c->pending_cancels++;
}
io_uring_submit(ring);
}
static int pending_shutdown(struct conn *c)
{
return c->cd[0].pending_shutdown + c->cd[1].pending_shutdown;
}
static bool should_shutdown(struct conn *c)
{
int i;
if (!pending_shutdown(c))
return false;
if (is_sink)
return true;
if (!bidi)
return c->cd[0].rcv == c->cd[1].snd;
for (i = 0; i < 2; i++) {
if (c->cd[0].rcv != c->cd[1].snd)
return false;
if (c->cd[1].rcv != c->cd[0].snd)
return false;
}
return true;
}
static void __close_conn(struct io_uring *ring, struct conn *c)
{
printf("Client %d: queueing shutdown\n", c->tid);
queue_cancel(ring, c);
io_uring_submit(ring);
}
static void close_cd(struct conn *c, struct conn_dir *cd)
{
if (cd->pending_sends)
return;
cd->pending_shutdown = 1;
if (!(c->flags & CONN_F_PENDING_SHUTDOWN)) {
gettimeofday(&c->end_time, NULL);
c->flags |= CONN_F_PENDING_SHUTDOWN | CONN_F_END_TIME;
}
}
/*
* We're done with this buffer, add it back to our pool so the kernel is
* free to use it again.
*/
static void replenish_buffer(struct conn *c, struct io_uring_cqe *cqe, int bid)
{
struct conn_buf_ring *cbr = &c->brs[cqe_to_bgid(cqe) - c->start_bgid];
void *this_buf;
this_buf = cbr->buf + bid * buf_size;
io_uring_buf_ring_add(cbr->br, this_buf, buf_size, bid, br_mask, 0);
io_uring_buf_ring_advance(cbr->br, 1);
}
static void __queue_send(struct io_uring *ring, struct conn *c, int fd,
void *data, int len, int bgid, int bid)
{
struct conn_dir *cd = fd_to_conn_dir(c, fd);
struct io_uring_sqe *sqe;
vlog("%d: send %d to fd %d (%p, bgid %d, bid %d)\n", c->tid, len, fd, data, bgid, bid);
sqe = get_sqe(ring);
io_uring_prep_send(sqe, fd, data, len, MSG_WAITALL | MSG_NOSIGNAL);
encode_userdata(sqe, c, __SEND, bgid, bid, fd);
if (fixed_files)
sqe->flags |= IOSQE_FIXED_FILE;
cd->pending_sends++;
}
/*
* Submit any deferred sends (see comment for defer_send()).
*/
static void submit_deferred_send(struct io_uring *ring, struct conn *c,
struct conn_dir *cd)
{
struct pending_send *ps;
if (list_empty(&cd->send_list)) {
vlog("%d: defer send %p empty\n", c->tid, cd);
return;
}
vlog("%d: queueing deferred send %p\n", c->tid, cd);
ps = list_first_entry(&cd->send_list, struct pending_send, list);
list_del(&ps->list);
__queue_send(ring, c, ps->fd, ps->data, ps->len, ps->bgid, ps->bid);
free(ps);
}
/*
* We have pending sends on this socket. Normally this is not an issue, but
* if we don't serialize sends, then we can get into a situation where the
* following can happen:
*
* 1) Submit sendA for socket1
* 2) socket1 buffer is full, poll is armed for sendA
* 3) socket1 space frees up
* 4) Poll triggers retry for sendA
* 5) Submit sendB for socket1
* 6) sendB completes
* 7) sendA is retried
*
* Regardless of the outcome of what happens with sendA in step 7 (it completes
* or it gets deferred because the socket1 buffer is now full again after sendB
* has been filled), we've now reordered the received data.
*
* This isn't a common occurence, but more likely with big buffers. If we never
* run into out-of-space in the socket, we could easily support having more than
* one send in-flight at the same time.
*
* Something to think about on the kernel side...
*/
static void defer_send(struct conn *c, struct conn_dir *cd,
struct io_uring_cqe *cqe, int bid, int out_fd)
{
void *data = get_buf(c, cqe_to_bgid(cqe), bid);
struct pending_send *ps;
vlog("%d: defer send %d to fd %d (%p, bgid %d, bid %d)\n", c->tid,
cqe->res, out_fd, data,
cqe_to_bgid(cqe), bid);
vlog("%d: pending %d, %p\n", c->tid, cd->pending_sends, cd);
cd->snd_busy++;
ps = malloc(sizeof(*ps));
ps->fd = out_fd;
ps->bgid = cqe_to_bgid(cqe);
ps->bid = bid;
ps->len = cqe->res;
ps->data = data;
list_add_tail(&ps->list, &cd->send_list);
}
/*
* Queue a send based on the data received in this cqe, which came from
* a completed receive operation.
*/
static void queue_send(struct io_uring *ring, struct conn *c,
struct io_uring_cqe *cqe, int bid, int out_fd)
{
struct conn_dir *cd = fd_to_conn_dir(c, out_fd);
int bgid = cqe_to_bgid(cqe);
if (cd->pending_sends) {
defer_send(c, cd, cqe, bid, out_fd);
} else {
void *data = get_buf(c, bgid, bid);
__queue_send(ring, c, out_fd, data, cqe->res, bgid, bid);
}
}
static int handle_accept(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct io_uring_sqe *sqe;
struct conn *c;
int domain;
if (nr_conns == MAX_CONNS) {
fprintf(stderr, "max clients reached %d\n", nr_conns);
return 1;
}
c = &conns[nr_conns];
c->tid = nr_conns++;
c->in_fd = cqe->res;
c->out_fd = -1;
gettimeofday(&c->start_time, NULL);
open_conns++;
printf("New client: id=%d, in=%d\n", c->tid, c->in_fd);
setup_buffer_rings(ring, c);
init_list_head(&c->cd[0].send_list);
init_list_head(&c->cd[1].send_list);
if (is_sink) {
submit_receive(ring, c);
return 0;
}
if (ipv6)
domain = AF_INET6;
else
domain = AF_INET;
/*
* If fixed_files is set, proxy will use fixed files for any
* new file descriptors it instantiates. Fixd files, or fixed
* descriptors, are io_uring private file descriptors. They
* cannot be accessed outside of io_uring. io_uring holds a
* fixed reference to them, which means that we do not need to
* grab per-request references to them. Particularly for
* threaded applications, grabbing and dropping file references
* for each operation can be costly as the file table is shared.
* This generally shows up as fget/fput related overhead in
* any workload profiles.
*
* Fixed descriptors are passed in via the 'fd' field just
* like regular descriptors, and then marked as such by
* setting the IOSQE_FIXED_FILE flag in the sqe->flags field.
* Some helpers do that automatically, like the below, others
* will need it set manually if they don't have a *direct*()
* helper.
*
* For operations that instantiate them, like the opening of
* a direct socket, the application may either ask the kernel
* to find a free one (as is done below), or the application
* may manage the space itself and pass in an index for a
* currently free slot in the table. If the kernel is asked
* to allocate a free direct descriptor, note that io_uring
* does not abide by the POSIX mandated "lowest free must be
* returned". It may return any free descriptor of its
* choosing.
*/
sqe = get_sqe(ring);
if (fixed_files)
io_uring_prep_socket_direct_alloc(sqe, domain, SOCK_STREAM, 0, 0);
else
io_uring_prep_socket(sqe, domain, SOCK_STREAM, 0, 0);
encode_userdata(sqe, c, __SOCK, 0, 0, 0);
return 0;
}
static int handle_sock(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
struct io_uring_sqe *sqe;
int ret;
vlog("%d: sock: res=%d\n", c->tid, cqe->res);
c->out_fd = cqe->res;
if (ipv6) {
memset(&c->addr6, 0, sizeof(c->addr6));
c->addr6.sin6_family = AF_INET6;
c->addr6.sin6_port = htons(send_port);
ret = inet_pton(AF_INET6, host, &c->addr6.sin6_addr);
} else {
memset(&c->addr, 0, sizeof(c->addr));
c->addr.sin_family = AF_INET;
c->addr.sin_port = htons(send_port);
ret = inet_pton(AF_INET, host, &c->addr.sin_addr);
}
if (ret <= 0) {
if (!ret)
fprintf(stderr, "host not in right format\n");
else
perror("inet_pton");
return 1;
}
sqe = get_sqe(ring);
if (ipv6) {
io_uring_prep_connect(sqe, c->out_fd,
(struct sockaddr *) &c->addr6,
sizeof(c->addr6));
} else {
io_uring_prep_connect(sqe, c->out_fd,
(struct sockaddr *) &c->addr,
sizeof(c->addr));
}
encode_userdata(sqe, c, __CONNECT, 0, 0, c->out_fd);
if (fixed_files)
sqe->flags |= IOSQE_FIXED_FILE;
return 0;
}
static int handle_connect(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
if (bidi)
submit_bidi_receive(ring, c);
else
submit_receive(ring, c);
return 0;
}
static int __handle_recv(struct io_uring *ring, struct conn *c,
struct io_uring_cqe *cqe, int in_fd, int out_fd)
{
struct conn_dir *cd = fd_to_conn_dir(c, in_fd);
int bid, do_recv = !mshot;
/*
* Not having a buffer attached should only happen if we get a zero
* sized receive, because the other end closed the connection. It
* cannot happen otherwise, as all our receives are using provided
* buffers and hence it's not possible to return a CQE with a non-zero
* result and not have a buffer attached.
*/
if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
if (!cqe->res) {
close_cd(c, cd);
return 0;
}
fprintf(stderr, "no buffer assigned, res=%d\n", cqe->res);
return 1;
}
cd->rcv++;
if (cqe->res != buf_size)
cd->rcv_shrt++;
/*
* If multishot terminates, just submit a new one.
*/
if (mshot && !(cqe->flags & IORING_CQE_F_MORE)) {
cd->mshot_resubmit++;
do_recv = 1;
}
bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
vlog("%d: recv: bid=%d, bgid=%d, res=%d\n", c->tid, bid, cqe_to_bgid(cqe), cqe->res);
/*
* If we're a sink, we're done here. Just replenish the buffer back
* to the pool. For proxy mode, we will send the data to the other
* end and the buffer will be replenished once the send is done with
* it.
*/
if (is_sink)
replenish_buffer(c, cqe, bid);
else
queue_send(ring, c, cqe, bid, out_fd);
cd->in_bytes += cqe->res;
/*
* If we're not doing multishot receive, or if multishot receive
* terminated, we need to submit a new receive request as this one
* has completed. Multishot will stay armed.
*/
if (do_recv)
__submit_receive(ring, c, in_fd);
return 0;
}
static int handle_recv(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
int fd = cqe_to_fd(cqe);
if (fd == c->in_fd)
return __handle_recv(ring, c, cqe, c->in_fd, c->out_fd);
return __handle_recv(ring, c, cqe, c->out_fd, c->in_fd);
}
static int recv_error(struct error_handler *err, struct io_uring *ring,
struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
int in_fd, fd = cqe_to_fd(cqe);
if (cqe->res != -ENOBUFS)
return default_error(err, ring, cqe);
if (fd == c->in_fd)
in_fd = c->in_fd;
else
in_fd = c->out_fd;
handle_enobufs(ring, c, fd_to_conn_dir(c, in_fd), in_fd);
return 0;
}
static int handle_send(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
int fd = cqe_to_fd(cqe);
struct conn_dir *cd = fd_to_conn_dir(c, fd);
cd->snd++;
cd->out_bytes += cqe->res;
if (cqe->res != buf_size)
cd->snd_shrt++;
vlog("%d: send: bid=%d, bgid=%d, res=%d\n", c->tid, cqe_to_bid(cqe),
cqe_to_bgid(cqe), cqe->res);
/*
* Find the provided buffer that the receive consumed, and
* which we then used for the send, and add it back to the
* pool so it can get picked by another receive. Once the send
* is done, we're done with it.
*/
replenish_buffer(c, cqe, cqe_to_bid(cqe));
cd->pending_sends--;
vlog("%d: pending sends %d\n", c->tid, cd->pending_sends);
if (!cd->pending_sends) {
if (!cqe->res)
close_cd(c, cd);
else
submit_deferred_send(ring, c, cd);
}
return 0;
}
/*
* We don't expect to get here, as we marked it with skipping posting a
* CQE if it was successful. If it does trigger, than means it fails and
* that our close has not been done. Log the shutdown error and issue a new
* separate close.
*/
static int handle_shutdown(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
struct io_uring_sqe *sqe;
int fd = cqe_to_fd(cqe);
fprintf(stderr, "Got shutdown notication on fd %d\n", fd);
if (!cqe->res)
fprintf(stderr, "Unexpected success shutdown CQE\n");
else if (cqe->res < 0)
fprintf(stderr, "Shutdown got %s\n", strerror(-cqe->res));
sqe = get_sqe(ring);
if (fixed_files)
io_uring_prep_close_direct(sqe, fd);
else
io_uring_prep_close(sqe, fd);
encode_userdata(sqe, c, __CLOSE, 0, 0, fd);
return 0;
}
/*
* Final stage of a connection, the shutdown and close has finished. Mark
* it as disconnected and let the main loop reap it.
*/
static int handle_close(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
int fd = cqe_to_fd(cqe);
c->flags |= CONN_F_DISCONNECTED;
printf("Closed client: id=%d, in_fd=%d, out_fd=%d\n", c->tid, c->in_fd, c->out_fd);
if (fd == c->in_fd)
c->in_fd = -1;
else if (fd == c->out_fd)
c->out_fd = -1;
if (c->in_fd == -1 && c->out_fd == -1) {
__show_stats(c);
open_conns--;
free_buffer_rings(ring, c);
}
return 0;
}
static int handle_cancel(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
int fd = cqe_to_fd(cqe);
c->pending_cancels--;
vlog("%d: got cancel fd %d, refs %d\n", c->tid, fd, c->pending_cancels);
if (!c->pending_cancels) {
queue_shutdown_close(ring, c, c->in_fd);
if (c->out_fd != -1)
queue_shutdown_close(ring, c, c->out_fd);
io_uring_submit(ring);
}
return 0;
}
/*
* Called for each CQE that we receive. Decode the request type that it
* came from, and call the appropriate handler.
*/
static int handle_cqe(struct io_uring *ring, struct io_uring_cqe *cqe)
{
int ret;
/*
* Unlikely, but there's an error in this CQE. If an error handler
* is defined, call it, and that will deal with it. If no error
* handler is defined, the opcode handler either doesn't care or will
* handle it on its own.
*/
if (cqe->res < 0) {
struct error_handler *err = &error_handlers[cqe_to_op(cqe)];
if (err->error_fn)
return err->error_fn(err, ring, cqe);
}
switch (cqe_to_op(cqe)) {
case __ACCEPT:
ret = handle_accept(ring, cqe);
break;
case __SOCK:
ret = handle_sock(ring, cqe);
break;
case __CONNECT:
ret = handle_connect(ring, cqe);
break;
case __RECV:
ret = handle_recv(ring, cqe);
break;
case __SEND:
ret = handle_send(ring, cqe);
break;
case __CANCEL:
ret = handle_cancel(ring, cqe);
break;
case __SHUTDOWN:
ret = handle_shutdown(ring, cqe);
break;
case __CLOSE:
ret = handle_close(ring, cqe);
break;
default:
fprintf(stderr, "bad user data %lx\n", (long) cqe->user_data);
return 1;
}
return ret;
}
static void usage(const char *name)
{
printf("%s:\n", name);
printf("\t-m:\t\tUse multishot receive (%d)\n", mshot);
printf("\t-d:\t\tUse DEFER_TASKRUN (%d)\n", defer_tw);
printf("\t-S:\t\tUse SQPOLL (%d)\n", sqpoll);
printf("\t-b:\t\tSend/receive buf size (%d)\n", buf_size);
printf("\t-n:\t\tNumber of provided buffers (pow2) (%d)\n", nr_bufs);
printf("\t-w:\t\tNumber of CQEs to wait for each loop (%d)\n", wait_batch);
printf("\t-t:\t\tTimeout for waiting on CQEs (usec) (%d)\n", wait_usec);
printf("\t-s:\t\tAct only as a sink (%d)\n", is_sink);
printf("\t-f:\t\tUse only fixed files (%d)\n", fixed_files);
printf("\t-B:\t\tUse bi-directional mode (%d)\n", bidi);
printf("\t-H:\t\tHost to connect to (%s)\n", host);
printf("\t-r:\t\tPort to receive on (%d)\n", receive_port);
printf("\t-p:\t\tPort to connect to (%d)\n", send_port);
printf("\t-6:\t\tUse IPv6 (%d)\n", ipv6);
printf("\t-N:\t\tUse NAPI polling (%d)\n", napi);
printf("\t-T:\t\tNAPI timeout (usec) (%d)\n", napi_timeout);
printf("\t-V:\t\tIncrease verbosity (%d)\n", verbose);
}
static void check_for_close(struct io_uring *ring)
{
int i;
for (i = 0; i < nr_conns; i++) {
struct conn *c = &conns[i];
if (c->flags & (CONN_F_DISCONNECTING | CONN_F_DISCONNECTED))
continue;
if (should_shutdown(c)) {
__close_conn(ring, c);
c->flags |= CONN_F_DISCONNECTING;
}
}
}
/*
* Main event loop, Submit our multishot accept request, and then just loop
* around handling incoming events.
*/
static int event_loop(struct io_uring *ring, int fd)
{
struct __kernel_timespec active_ts, idle_ts = { .tv_sec = 1, };
struct io_uring_sqe *sqe;
int flags;
/*
* proxy provides a way to use either multishot receive or not, but
* for accept, we always use multishot. A multishot accept request
* needs only be armed once, and then it'll trigger a completion and
* post a CQE whenever a new connection is accepted. No need to do
* anything else, unless the multishot accept terminates. This happens
* if it encounters an error. Applications should check for
* IORING_CQE_F_MORE in cqe->flags - this tells you if more completions
* are expected from this request or not. Non-multishot never have
* this set, where multishot will always have this set unless an error
* occurs.
*/
sqe = get_sqe(ring);
if (fixed_files)
io_uring_prep_multishot_accept_direct(sqe, fd, NULL, NULL, 0);
else
io_uring_prep_multishot_accept(sqe, fd, NULL, NULL, 0);
__encode_userdata(sqe, 0, __ACCEPT, 0, 0, fd);
if (wait_usec > 1000000) {
active_ts.tv_sec = wait_usec / 1000000;
wait_usec -= active_ts.tv_sec * 1000000;
}
active_ts.tv_nsec = wait_usec * 1000;
flags = 0;
while (1) {
struct __kernel_timespec *ts = &idle_ts;
struct io_uring_cqe *cqe;
unsigned int head;
int i, to_wait;
/*
* If wait_batch is set higher than 1, then we'll wait on
* that amount of CQEs to be posted each loop. If used with
* DEFER_TASKRUN, this can provide a substantial reduction
* in context switch rate as the task isn't woken until the
* requested number of events can be returned.
*
* Can be used with -t to set a wait_usec timeout as well.
* For example, if an application can deal with 250 usec
* of wait latencies, it can set -w8 -t250 which will cause
* io_uring to return when either 8 events have been received,
* or if 250 usec of waiting has passed.
*
* If we don't have any open connections, wait on just 1
* always.
*/
to_wait = 1;
if (open_conns && !flags) {
ts = &active_ts;
to_wait = open_conns * wait_batch;
}
io_uring_submit_and_wait_timeout(ring, &cqe, to_wait, ts, NULL);
i = flags = 0;
io_uring_for_each_cqe(ring, head, cqe) {
if (handle_cqe(ring, cqe))
return 1;
flags |= cqe_to_conn(cqe)->flags;
++i;
}
/*
* Advance the CQ ring for seen events when we've processed
* all of them in this loop. This can also be done with
* io_uring_cqe_seen() in each handler above, which just marks
* that single CQE as seen. However, it's more efficient to
* mark a batch as seen when we're done with that batch.
*/
if (i)
io_uring_cq_advance(ring, i);
if (!i || (flags & (CONN_F_PENDING_SHUTDOWN)))
check_for_close(ring);
event_loops++;
events += i;
}
return 0;
}
/*
* Options parsing the ring / net setup
*/
int main(int argc, char *argv[])
{
struct io_uring ring;
struct io_uring_params params;
struct sigaction sa = { };
int opt, ret, fd;
page_size = sysconf(_SC_PAGESIZE);
if (page_size < 0) {
perror("sysconf(_SC_PAGESIZE)");
return 1;
}
while ((opt = getopt(argc, argv, "m:d:S:s:b:f:H:r:p:n:B:N:T:w:t:6Vh?")) != -1) {
switch (opt) {
case 'm':
mshot = !!atoi(optarg);
break;
case 'S':
sqpoll = !!atoi(optarg);
break;
case 'd':
defer_tw = !!atoi(optarg);
break;
case 'b':
buf_size = atoi(optarg);
break;
case 'n':
nr_bufs = atoi(optarg);
break;
case 'w':
wait_batch = atoi(optarg);
break;
case 't':
wait_usec = atoi(optarg);
break;
case 's':
is_sink = !!atoi(optarg);
break;
case 'f':
fixed_files = !!atoi(optarg);
break;
case 'H':
host = strdup(optarg);
break;
case 'r':
receive_port = atoi(optarg);
break;
case 'p':
send_port = atoi(optarg);
break;
case 'B':
bidi = !!atoi(optarg);
break;
case 'N':
napi = !!atoi(optarg);
break;
case 'T':
napi_timeout = atoi(optarg);
break;
case '6':
ipv6 = true;
break;
case 'V':
verbose++;
break;
case 'h':
default:
usage(argv[0]);
return 1;
}
}
if (bidi && is_sink) {
fprintf(stderr, "Can't be both bidi proxy and sink\n");
return 1;
}
br_mask = nr_bufs - 1;
fd = setup_listening_socket(receive_port);
if (is_sink)
send_port = -1;
if (fd == -1)
return 1;
atexit(show_stats);
sa.sa_handler = sig_int;
sa.sa_flags = SA_RESTART;
sigaction(SIGINT, &sa, NULL);
/*
* By default, set us up with a big CQ ring. Not strictly needed
* here, but it's very important to never overflow the CQ ring.
* Events will not be dropped if this happens, but it does slow
* the application down in dealing with overflown events.
*
* Set SINGLE_ISSUER, which tells the kernel that only one thread
* is doing IO submissions. This enables certain optimizations in
* the kernel.
*/
memset(&params, 0, sizeof(params));
params.flags |= IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_CLAMP;
params.flags |= IORING_SETUP_CQSIZE;
params.cq_entries = 131072;
/*
* DEFER_TASKRUN decouples async event reaping and retrying from
* regular system calls. If this isn't set, then io_uring uses
* normal task_work for this. task_work is always being run on any
* exit to userspace. Real applications do more than just call IO
* related system calls, and hence we can be running this work way
* too often. Using DEFER_TASKRUN defers any task_work running to
* when the application enters the kernel anyway to wait on new
* events. It's generally the preferred and recommended way to setup
* a ring.
*/
if (defer_tw) {
params.flags |= IORING_SETUP_DEFER_TASKRUN;
sqpoll = 0;
}
/*
* SQPOLL offloads any request submission and retry operations to a
* dedicated thread. This enables an application to do IO without
* ever having to enter the kernel itself. The SQPOLL thread will
* stay busy as long as there's work to do, and go to sleep if
* sq_thread_idle msecs have passed. If it's running, submitting new
* IO just needs to make them visible to the SQPOLL thread, it needs
* not enter the kernel. For submission, the application will only
* enter the kernel if the SQPOLL has been idle long enough that it
* has gone to sleep.
*
* Waiting on events still need to enter the kernel, if none are
* available. The application may also use io_uring_peek_cqe() to
* check for new events without entering the kernel, as completions
* will be continually produced to the CQ ring by the SQPOLL thread
* as they occur.
*/
if (sqpoll) {
params.flags |= IORING_SETUP_SQPOLL;
params.sq_thread_idle = 1000;
defer_tw = 0;
}
/*
* If neither DEFER_TASKRUN or SQPOLL is used, set COOP_TASKRUN. This
* avoids heavy signal based notifications, which can force an
* application to enter the kernel and process it as soon as they
* occur.
*/
if (!sqpoll && !defer_tw)
params.flags |= IORING_SETUP_COOP_TASKRUN;
/*
* The SQ ring size need not be larger than any batch of requests
* that need to be prepared before submit. Normally in a loop we'd
* only need a few, if any, particularly if multishot is used.
*/
ret = io_uring_queue_init_params(128, &ring, &params);
if (ret) {
fprintf(stderr, "%s\n", strerror(-ret));
return 1;
}
if (fixed_files) {
/*
* If fixed files are used, we need to allocate a fixed file
* table upfront where new direct descriptors can be managed.
*/
ret = io_uring_register_files_sparse(&ring, 4096);
if (ret) {
fprintf(stderr, "file register: %d\n", ret);
return 1;
}
/*
* If fixed files are used, we also register the ring fd. See
* comment near io_uring_prep_socket_direct_alloc() further
* down. This avoids the fget/fput overhead associated with
* the io_uring_enter(2) system call itself, which is used to
* submit and wait on events.
*/
ret = io_uring_register_ring_fd(&ring);
if (ret != 1) {
fprintf(stderr, "ring register: %d\n", ret);
return 1;
}
}
if (napi) {
struct io_uring_napi n = {
.prefer_busy_poll = napi > 1 ? 1 : 0,
.busy_poll_to = napi_timeout,
};
ret = io_uring_register_napi(&ring, &n);
if (ret) {
fprintf(stderr, "io_uring_register_napi: %d\n", ret);
if (ret != -EINVAL)
return 1;
fprintf(stderr, "NAPI not available, turned off\n");
}
}
printf("Backend: multishot=%d, sqpoll=%d, defer_tw=%d, fixed_files=%d "
"is_sink=%d, buf_size=%d, nr_bufs=%d, host=%s, send_port=%d "
"receive_port=%d, napi=%d, napi_timeout=%d\n",
mshot, sqpoll, defer_tw, fixed_files, is_sink,
buf_size, nr_bufs, host, send_port, receive_port,
napi, napi_timeout);
return event_loop(&ring, fd);
}