blob: 37eb35f1bb501ca90884c05dc728a15a77be43b3 [file] [log] [blame]
/* SPDX-License-Identifier: MIT */
/*
* Sample program that can act either as a packet sink, where it just receives
* packets and doesn't do anything with them, or it can act as a proxy where it
* receives packets and then sends them to a new destination. The proxy can
* be unidirectional (-B0), or bi-direction (-B1).
*
* Examples:
*
* Act as a proxy, listening on port 4444, and send data to 192.168.2.6 on port
* 4445. Use multishot receive, DEFER_TASKRUN, and fixed files
*
* ./proxy -m1 -d1 -f1 -r4444 -H 192.168.2.6 -p4445
*
*
* Same as above, but utilize send bundles (-U1, requires -u1 send_ring) as well
* with ring provided send buffers.
*
* ./proxy -m1 -d1 -f1 -u1 -U1 -r4444 -H 192.168.2.6 -p4445
*
* Act as a bi-directional proxy, listening on port 8888, and send data back
* and forth between host and 192.168.2.6 on port 22. Use multishot receive,
* DEFER_TASKRUN, fixed files, and buffers of size 1500.
*
* ./proxy -m1 -d1 -f1 -B1 -b1500 -r8888 -H 192.168.2.6 -p22
*
* Act a sink, listening on port 4445, using multishot receive, DEFER_TASKRUN,
* and fixed files:
*
* ./proxy -m1 -d1 -s1 -f1 -r4445
*
* Run with -h to see a list of options, and their defaults.
*
* (C) 2024 Jens Axboe <axboe@kernel.dk>
*
*/
#include <fcntl.h>
#include <stdint.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <unistd.h>
#include <assert.h>
#include <liburing.h>
#include "proxy.h"
#include "list.h"
#include "helpers.h"
/*
* Will go away once/if bundles are upstreamed and we put the generic
* definitions in the kernel header.
*/
#ifndef IORING_RECVSEND_BUNDLE
#define IORING_RECVSEND_BUNDLE (1U << 4)
#endif
#ifndef IORING_FEAT_SEND_BUF_SELECT
#define IORING_FEAT_SEND_BUF_SELECT (1U << 14)
#endif
static int cur_bgid = 1;
static int nr_conns;
static int open_conns;
static long page_size;
static unsigned long event_loops;
static unsigned long events;
static int recv_mshot = 1;
static int sqpoll;
static int defer_tw = 1;
static int is_sink;
static int fixed_files = 1;
static char *host = "192.168.2.6";
static int send_port = 4445;
static int receive_port = 4444;
static int buf_size = 32;
static int bidi;
static int ipv6;
static int napi;
static int napi_timeout;
static int wait_batch = 1;
static int wait_usec = 1000000;
static int rcv_msg;
static int snd_msg;
static int send_ring = -1;
static int snd_bundle;
static int rcv_bundle;
static int verbose;
static int nr_bufs = 256;
static int br_mask;
static int ring_size = 128;
struct pending_send {
struct list_head list;
int fd, bid, len;
void *data;
};
/*
* For sendmsg/recvmsg. recvmsg just has a single vec, sendmsg will have
* two vecs - one that is currently sent out and being sent, and one that
* is being prepared. When a new sendmsg is issued, we'll swap which one we
* use.
*/
struct msg_vec {
struct iovec *iov;
/* length of allocated vec */
int vec_size;
/* length currently being used */
int iov_len;
/* only for send, current index we're processing */
int cur_iov;
};
struct io_msg {
struct msghdr msg;
struct msg_vec vecs[2];
int vec_index;
};
/*
* Per socket stats per connection. For bi-directional, we'll have both
* sends and receives on each socket, this helps track them seperately.
* For sink or one directional, each of the two stats will be only sends
* or receives, not both.
*/
struct conn_dir {
int index;
int pending_shutdown;
int pending_send;
int pending_recv;
struct list_head send_list;
int out_buffers;
int rcv, rcv_shrt, rcv_enobufs, rcv_mshot;
int snd, snd_shrt, snd_enobufs, snd_busy, snd_mshot;
int rcv_need_rearm;
int rcv_rearm;
int snd_next_bid;
int rcv_next_bid;
unsigned long in_bytes, out_bytes;
/* only ever have a single recv pending */
struct io_msg io_rcv_msg;
/* one send that is inflight, and one being prepared for the next one */
struct io_msg io_snd_msg;
};
enum {
CONN_F_DISCONNECTING = 1,
CONN_F_DISCONNECTED = 2,
CONN_F_PENDING_SHUTDOWN = 4,
CONN_F_STATS_SHOWN = 8,
CONN_F_END_TIME = 16,
};
/*
* buffer ring belonging to a connection
*/
struct conn_buf_ring {
struct io_uring_buf_ring *br;
void *buf;
int bgid;
};
struct conn {
struct conn_buf_ring in_br;
struct conn_buf_ring out_br;
int tid;
int in_fd, out_fd;
int pending_cancels;
int flags;
struct conn_dir cd[2];
struct timeval start_time, end_time;
union {
struct sockaddr_in addr;
struct sockaddr_in6 addr6;
};
};
#define MAX_CONNS 1024
static struct conn conns[MAX_CONNS];
#define vlog(str, ...) do { \
if (verbose) \
printf(str, ##__VA_ARGS__); \
} while (0)
static void prep_next_send(struct io_uring *ring, struct conn *c,
struct conn_dir *cd, int fd);
static struct conn *cqe_to_conn(struct io_uring_cqe *cqe)
{
struct userdata ud = { .val = cqe->user_data };
return &conns[ud.op_tid & TID_MASK];
}
static struct conn_dir *cqe_to_conn_dir(struct conn *c,
struct io_uring_cqe *cqe)
{
int fd = cqe_to_fd(cqe);
return &c->cd[fd != c->in_fd];
}
static int other_dir_fd(struct conn *c, int fd)
{
if (c->in_fd == fd)
return c->out_fd;
return c->in_fd;
}
static struct msg_vec *msg_vec(struct io_msg *imsg)
{
return &imsg->vecs[imsg->vec_index];
}
static struct msg_vec *snd_msg_vec(struct conn_dir *cd)
{
return msg_vec(&cd->io_snd_msg);
}
/*
* Goes from accept new connection -> create socket, connect to end
* point, prepare recv, on receive do send (unless sink). If either ends
* disconnects, we transition to shutdown and then close.
*/
enum {
__ACCEPT = 1,
__SOCK = 2,
__CONNECT = 3,
__RECV = 4,
__SEND = 5,
__SHUTDOWN = 6,
__CANCEL = 7,
__CLOSE = 8,
};
struct error_handler {
const char *name;
int (*error_fn)(struct error_handler *, struct io_uring *, struct io_uring_cqe *);
};
static int recv_error(struct error_handler *err, struct io_uring *ring,
struct io_uring_cqe *cqe);
static int send_error(struct error_handler *err, struct io_uring *ring,
struct io_uring_cqe *cqe);
static int default_error(struct error_handler *err,
struct io_uring __attribute__((__unused__)) *ring,
struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
fprintf(stderr, "%d: %s error %s\n", c->tid, err->name, strerror(-cqe->res));
fprintf(stderr, "fd=%d, bid=%d\n", cqe_to_fd(cqe), cqe_to_bid(cqe));
return 1;
}
/*
* Move error handling out of the normal handling path, cleanly seperating
* them. If an opcode doesn't need any error handling, set it to NULL. If
* it wants to stop the connection at that point and not do anything else,
* then the default handler can be used. Only receive has proper error
* handling, as we can get -ENOBUFS which is not a fatal condition. It just
* means we need to wait on buffer replenishing before re-arming the receive.
*/
static struct error_handler error_handlers[] = {
{ .name = "NULL", .error_fn = NULL, },
{ .name = "ACCEPT", .error_fn = default_error, },
{ .name = "SOCK", .error_fn = default_error, },
{ .name = "CONNECT", .error_fn = default_error, },
{ .name = "RECV", .error_fn = recv_error, },
{ .name = "SEND", .error_fn = send_error, },
{ .name = "SHUTDOWN", .error_fn = NULL, },
{ .name = "CANCEL", .error_fn = NULL, },
{ .name = "CLOSE", .error_fn = NULL, },
};
static void free_buffer_ring(struct io_uring *ring, struct conn_buf_ring *cbr)
{
if (!cbr->br)
return;
io_uring_free_buf_ring(ring, cbr->br, nr_bufs, cbr->bgid);
cbr->br = NULL;
free(cbr->buf);
}
static void free_buffer_rings(struct io_uring *ring, struct conn *c)
{
free_buffer_ring(ring, &c->in_br);
free_buffer_ring(ring, &c->out_br);
}
/*
* Setup a ring provided buffer ring for each connection. If we get -ENOBUFS
* on receive, for multishot receive we'll wait for half the provided buffers
* to be returned by pending sends, then re-arm the multishot receive. If
* this happens too frequently (see enobufs= stat), then the ring size is
* likely too small. Use -nXX to make it bigger. See recv_enobufs().
*
* The alternative here would be to use the older style provided buffers,
* where you simply setup a buffer group and use SQEs with
* io_urign_prep_provide_buffers() to add to the pool. But that approach is
* slower and has been deprecated by using the faster ring provided buffers.
*/
static int setup_recv_ring(struct io_uring *ring, struct conn *c)
{
struct conn_buf_ring *cbr = &c->in_br;
int ret, i;
void *ptr;
cbr->buf = NULL;
if (posix_memalign(&cbr->buf, page_size, buf_size * nr_bufs)) {
perror("posix memalign");
return 1;
}
cbr->br = io_uring_setup_buf_ring(ring, nr_bufs, cbr->bgid, 0, &ret);
if (!cbr->br) {
fprintf(stderr, "Buffer ring register failed %d\n", ret);
return 1;
}
ptr = cbr->buf;
for (i = 0; i < nr_bufs; i++) {
vlog("%d: add bid %d, data %p\n", c->tid, i, ptr);
io_uring_buf_ring_add(cbr->br, ptr, buf_size, i, br_mask, i);
ptr += buf_size;
}
io_uring_buf_ring_advance(cbr->br, nr_bufs);
printf("%d: recv buffer ring bgid %d, bufs %d\n", c->tid, cbr->bgid, nr_bufs);
return 0;
}
/*
* If 'send_ring' is used and the kernel supports it, we can skip serializing
* sends as the data will be ordered regardless. This reduces the send handling
* complexity, as buffers can always be added to the outgoing ring and will be
* processed in the order in which they were added.
*/
static int setup_send_ring(struct io_uring *ring, struct conn *c)
{
struct conn_buf_ring *cbr = &c->out_br;
int ret;
cbr->br = io_uring_setup_buf_ring(ring, nr_bufs, cbr->bgid, 0, &ret);
if (!cbr->br) {
fprintf(stderr, "Buffer ring register failed %d\n", ret);
return 1;
}
printf("%d: send buffer ring bgid %d, bufs %d\n", c->tid, cbr->bgid, nr_bufs);
return 0;
}
/*
* Setup an input and output buffer ring
*/
static int setup_buffer_rings(struct io_uring *ring, struct conn *c)
{
int ret;
c->in_br.bgid = cur_bgid++;
c->out_br.bgid = cur_bgid++;
c->out_br.br = NULL;
ret = setup_recv_ring(ring, c);
if (ret)
return ret;
if (is_sink || !send_ring)
return 0;
ret = setup_send_ring(ring, c);
if (ret) {
free_buffer_ring(ring, &c->in_br);
return ret;
}
return 0;
}
static void __show_stats(struct conn *c)
{
unsigned long msec, qps;
struct conn_dir *cd;
int i;
if (c->flags & CONN_F_STATS_SHOWN)
return;
if (!(c->flags & CONN_F_END_TIME))
gettimeofday(&c->end_time, NULL);
msec = (c->end_time.tv_sec - c->start_time.tv_sec) * 1000;
msec += (c->end_time.tv_usec - c->start_time.tv_usec) / 1000;
qps = 0;
for (i = 0; i < 2; i++)
qps += c->cd[i].rcv + c->cd[i].snd;
if (!qps)
return;
if (msec)
qps = (qps * 1000) / msec;
printf("Conn %d/(in_fd=%d, out_fd=%d): qps=%lu, msec=%lu\n", c->tid,
c->in_fd, c->out_fd, qps, msec);
for (i = 0; i < 2; i++) {
cd = &c->cd[i];
if (!cd->in_bytes && !cd->out_bytes && !cd->snd && !cd->rcv)
continue;
printf("\t%3d: rcv=%u (short=%u, enobufs=%d), snd=%u (short=%u,"
" busy=%u, enobufs=%d)\n", i, cd->rcv, cd->rcv_shrt,
cd->rcv_enobufs, cd->snd, cd->snd_shrt, cd->snd_busy,
cd->snd_enobufs);
printf("\t : in_bytes=%lu (Kb %lu), out_bytes=%lu (Kb %lu)\n",
cd->in_bytes, cd->in_bytes >> 10,
cd->out_bytes, cd->out_bytes >> 10);
printf("\t : mshot_rcv=%d, mshot_snd=%d\n", cd->rcv_mshot,
cd->snd_mshot);
}
c->flags |= CONN_F_STATS_SHOWN;
}
static void show_stats(void)
{
float events_per_loop = 0.0;
static int stats_shown;
int i;
if (stats_shown)
return;
if (events)
events_per_loop = (float) events / (float) event_loops;
printf("Event loops: %lu, events %lu, events per loop %.2f\n", event_loops,
events, events_per_loop);
for (i = 0; i < MAX_CONNS; i++) {
struct conn *c = &conns[i];
__show_stats(c);
}
stats_shown = 1;
}
static void sig_int(int __attribute__((__unused__)) sig)
{
printf("\n");
show_stats();
exit(1);
}
/*
* Special cased for SQPOLL only, as we don't control when SQEs are consumed if
* that is used. Hence we may need to wait for the SQPOLL thread to keep up
* until we can get a new SQE. All other cases will break immediately, with a
* fresh SQE.
*
* If we grossly undersized our SQ ring, getting a NULL sqe can happen even
* for the !SQPOLL case if we're handling a lot of CQEs in our event loop
* and multishot isn't used. We can do io_uring_submit() to flush what we
* have here. Only caveat here is that if linked requests are used, SQEs
* would need to be allocated upfront as a link chain is only valid within
* a single submission cycle.
*/
static struct io_uring_sqe *get_sqe(struct io_uring *ring)
{
struct io_uring_sqe *sqe;
do {
sqe = io_uring_get_sqe(ring);
if (sqe)
break;
if (!sqpoll)
io_uring_submit(ring);
else
io_uring_sqring_wait(ring);
} while (1);
return sqe;
}
static void encode_userdata(struct io_uring_sqe *sqe, struct conn *c, int op,
int bid, int fd)
{
__encode_userdata(sqe, c->tid, op, bid, fd);
}
static void __submit_receive(struct io_uring *ring, struct conn *c,
struct conn_dir *cd, int fd)
{
struct conn_buf_ring *cbr = &c->in_br;
struct io_uring_sqe *sqe;
vlog("%d: submit receive fd=%d\n", c->tid, fd);
assert(!cd->pending_recv);
cd->pending_recv = 1;
/*
* For both recv and multishot receive, we use the ring provided
* buffers. These are handed to the application ahead of time, and
* are consumed when a receive triggers. Note that the address and
* length of the receive are set to NULL/0, and we assign the
* sqe->buf_group to tell the kernel which buffer group ID to pick
* a buffer from. Finally, IOSQE_BUFFER_SELECT is set to tell the
* kernel that we want a buffer picked for this request, we are not
* passing one in with the request.
*/
sqe = get_sqe(ring);
if (rcv_msg) {
struct io_msg *imsg = &cd->io_rcv_msg;
struct msghdr *msg = &imsg->msg;
memset(msg, 0, sizeof(*msg));
msg->msg_iov = msg_vec(imsg)->iov;
msg->msg_iovlen = msg_vec(imsg)->iov_len;
if (recv_mshot) {
cd->rcv_mshot++;
io_uring_prep_recvmsg_multishot(sqe, fd, &imsg->msg, 0);
} else {
io_uring_prep_recvmsg(sqe, fd, &imsg->msg, 0);
}
} else {
if (recv_mshot) {
cd->rcv_mshot++;
io_uring_prep_recv_multishot(sqe, fd, NULL, 0, 0);
} else {
io_uring_prep_recv(sqe, fd, NULL, 0, 0);
}
}
encode_userdata(sqe, c, __RECV, 0, fd);
sqe->buf_group = cbr->bgid;
sqe->flags |= IOSQE_BUFFER_SELECT;
if (fixed_files)
sqe->flags |= IOSQE_FIXED_FILE;
if (rcv_bundle)
sqe->ioprio |= IORING_RECVSEND_BUNDLE;
}
/*
* One directional just arms receive on our in_fd
*/
static void submit_receive(struct io_uring *ring, struct conn *c)
{
__submit_receive(ring, c, &c->cd[0], c->in_fd);
}
/*
* Bi-directional arms receive on both in and out fd
*/
static void submit_bidi_receive(struct io_uring *ring, struct conn *c)
{
__submit_receive(ring, c, &c->cd[0], c->in_fd);
__submit_receive(ring, c, &c->cd[1], c->out_fd);
}
/*
* We hit -ENOBUFS, which means that we ran out of buffers in our current
* provided buffer group. This can happen if there's an imbalance between the
* receives coming in and the sends being processed, particularly with multishot
* receive as they can trigger very quickly. If this happens, defer arming a
* new receive until we've replenished half of the buffer pool by processing
* pending sends.
*/
static void recv_enobufs(struct io_uring *ring, struct conn *c,
struct conn_dir *cd, int fd)
{
vlog("%d: enobufs hit\n", c->tid);
cd->rcv_enobufs++;
/*
* If we're a sink, mark rcv as rearm. If we're not, then mark us as
* needing a rearm for receive and send. The completing send will
* kick the recv rearm.
*/
if (!is_sink) {
cd->rcv_need_rearm = 1;
prep_next_send(ring, c, cd, fd);
} else {
cd->rcv_rearm = 1;
}
}
/*
* Kill this socket - submit a shutdown and link a close to it. We don't
* care about shutdown status, so mark it as not needing to post a CQE unless
* it fails.
*/
static void queue_shutdown_close(struct io_uring *ring, struct conn *c, int fd)
{
struct io_uring_sqe *sqe1, *sqe2;
/*
* On the off chance that we run out of SQEs after the first one,
* grab two upfront. This it to prevent our link not working if
* get_sqe() ends up doing submissions to free up an SQE, as links
* are not valid across separate submissions.
*/
sqe1 = get_sqe(ring);
sqe2 = get_sqe(ring);
io_uring_prep_shutdown(sqe1, fd, SHUT_RDWR);
if (fixed_files)
sqe1->flags |= IOSQE_FIXED_FILE;
sqe1->flags |= IOSQE_IO_LINK | IOSQE_CQE_SKIP_SUCCESS;
encode_userdata(sqe1, c, __SHUTDOWN, 0, fd);
if (fixed_files)
io_uring_prep_close_direct(sqe2, fd);
else
io_uring_prep_close(sqe2, fd);
encode_userdata(sqe2, c, __CLOSE, 0, fd);
}
static void queue_cancel(struct io_uring *ring, struct conn *c)
{
struct io_uring_sqe *sqe;
int flags = 0;
if (fixed_files)
flags |= IORING_ASYNC_CANCEL_FD_FIXED;
sqe = get_sqe(ring);
io_uring_prep_cancel_fd(sqe, c->in_fd, flags);
encode_userdata(sqe, c, __CANCEL, 0, c->in_fd);
c->pending_cancels++;
if (c->out_fd != -1) {
sqe = get_sqe(ring);
io_uring_prep_cancel_fd(sqe, c->out_fd, flags);
encode_userdata(sqe, c, __CANCEL, 0, c->out_fd);
c->pending_cancels++;
}
io_uring_submit(ring);
}
static int pending_shutdown(struct conn *c)
{
return c->cd[0].pending_shutdown + c->cd[1].pending_shutdown;
}
static bool should_shutdown(struct conn *c)
{
int i;
if (!pending_shutdown(c))
return false;
if (is_sink)
return true;
if (!bidi)
return c->cd[0].in_bytes == c->cd[1].out_bytes;
for (i = 0; i < 2; i++) {
if (c->cd[0].rcv != c->cd[1].snd)
return false;
if (c->cd[1].rcv != c->cd[0].snd)
return false;
}
return true;
}
static void __close_conn(struct io_uring *ring, struct conn *c)
{
printf("Client %d: queueing shutdown\n", c->tid);
queue_cancel(ring, c);
io_uring_submit(ring);
}
static void close_cd(struct conn *c, struct conn_dir *cd)
{
cd->pending_shutdown = 1;
if (cd->pending_send)
return;
if (!(c->flags & CONN_F_PENDING_SHUTDOWN)) {
gettimeofday(&c->end_time, NULL);
c->flags |= CONN_F_PENDING_SHUTDOWN | CONN_F_END_TIME;
}
}
/*
* We're done with this buffer, add it back to our pool so the kernel is
* free to use it again.
*/
static int replenish_buffer(struct conn_buf_ring *cbr, int bid, int offset)
{
void *this_buf = cbr->buf + bid * buf_size;
assert(bid < nr_bufs);
io_uring_buf_ring_add(cbr->br, this_buf, buf_size, bid, br_mask, offset);
return buf_size;
}
static int replenish_buffers(struct conn *c, int *bid, int bytes)
{
struct conn_buf_ring *cbr = &c->in_br;
int nr_packets = 0;
while (bytes) {
int this_len = replenish_buffer(cbr, *bid, nr_packets);
if (this_len > bytes)
this_len = bytes;
bytes -= this_len;
*bid = (*bid + 1) & (nr_bufs - 1);
nr_packets++;
}
io_uring_buf_ring_advance(cbr->br, nr_packets);
return nr_packets;
}
static void free_mvec(struct msg_vec *mvec)
{
free(mvec->iov);
mvec->iov = NULL;
}
static void init_mvec(struct msg_vec *mvec)
{
memset(mvec, 0, sizeof(*mvec));
mvec->iov = malloc(sizeof(struct iovec));
mvec->vec_size = 1;
}
static void init_msgs(struct conn_dir *cd)
{
memset(&cd->io_snd_msg, 0, sizeof(cd->io_snd_msg));
memset(&cd->io_rcv_msg, 0, sizeof(cd->io_rcv_msg));
init_mvec(&cd->io_snd_msg.vecs[0]);
init_mvec(&cd->io_snd_msg.vecs[1]);
init_mvec(&cd->io_rcv_msg.vecs[0]);
}
static void free_msgs(struct conn_dir *cd)
{
free_mvec(&cd->io_snd_msg.vecs[0]);
free_mvec(&cd->io_snd_msg.vecs[1]);
free_mvec(&cd->io_rcv_msg.vecs[0]);
}
static int handle_accept(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct io_uring_sqe *sqe;
struct conn *c;
int domain, i;
if (nr_conns == MAX_CONNS) {
fprintf(stderr, "max clients reached %d\n", nr_conns);
return 1;
}
c = &conns[nr_conns];
c->tid = nr_conns++;
c->in_fd = cqe->res;
c->out_fd = -1;
gettimeofday(&c->start_time, NULL);
for (i = 0; i < 2; i++) {
struct conn_dir *cd = &c->cd[i];
cd->index = i;
init_list_head(&cd->send_list);
cd->snd_next_bid = -1;
cd->rcv_next_bid = -1;
init_msgs(cd);
}
printf("New client: id=%d, in=%d\n", c->tid, c->in_fd);
if (setup_buffer_rings(ring, c))
return 1;
if (is_sink) {
submit_receive(ring, c);
return 0;
}
if (ipv6)
domain = AF_INET6;
else
domain = AF_INET;
/*
* If fixed_files is set, proxy will use fixed files for any
* new file descriptors it instantiates. Fixd files, or fixed
* descriptors, are io_uring private file descriptors. They
* cannot be accessed outside of io_uring. io_uring holds a
* fixed reference to them, which means that we do not need to
* grab per-request references to them. Particularly for
* threaded applications, grabbing and dropping file references
* for each operation can be costly as the file table is shared.
* This generally shows up as fget/fput related overhead in
* any workload profiles.
*
* Fixed descriptors are passed in via the 'fd' field just
* like regular descriptors, and then marked as such by
* setting the IOSQE_FIXED_FILE flag in the sqe->flags field.
* Some helpers do that automatically, like the below, others
* will need it set manually if they don't have a *direct*()
* helper.
*
* For operations that instantiate them, like the opening of
* a direct socket, the application may either ask the kernel
* to find a free one (as is done below), or the application
* may manage the space itself and pass in an index for a
* currently free slot in the table. If the kernel is asked
* to allocate a free direct descriptor, note that io_uring
* does not abide by the POSIX mandated "lowest free must be
* returned". It may return any free descriptor of its
* choosing.
*/
sqe = get_sqe(ring);
if (fixed_files)
io_uring_prep_socket_direct_alloc(sqe, domain, SOCK_STREAM, 0, 0);
else
io_uring_prep_socket(sqe, domain, SOCK_STREAM, 0, 0);
encode_userdata(sqe, c, __SOCK, 0, 0);
return 0;
}
static int handle_sock(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
struct io_uring_sqe *sqe;
int ret;
vlog("%d: sock: res=%d\n", c->tid, cqe->res);
c->out_fd = cqe->res;
if (ipv6) {
memset(&c->addr6, 0, sizeof(c->addr6));
c->addr6.sin6_family = AF_INET6;
c->addr6.sin6_port = htons(send_port);
ret = inet_pton(AF_INET6, host, &c->addr6.sin6_addr);
} else {
memset(&c->addr, 0, sizeof(c->addr));
c->addr.sin_family = AF_INET;
c->addr.sin_port = htons(send_port);
ret = inet_pton(AF_INET, host, &c->addr.sin_addr);
}
if (ret <= 0) {
if (!ret)
fprintf(stderr, "host not in right format\n");
else
perror("inet_pton");
return 1;
}
sqe = get_sqe(ring);
if (ipv6) {
io_uring_prep_connect(sqe, c->out_fd,
(struct sockaddr *) &c->addr6,
sizeof(c->addr6));
} else {
io_uring_prep_connect(sqe, c->out_fd,
(struct sockaddr *) &c->addr,
sizeof(c->addr));
}
encode_userdata(sqe, c, __CONNECT, 0, c->out_fd);
if (fixed_files)
sqe->flags |= IOSQE_FIXED_FILE;
return 0;
}
static int handle_connect(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
open_conns++;
if (bidi)
submit_bidi_receive(ring, c);
else
submit_receive(ring, c);
return 0;
}
static void send_append_vec(struct conn_dir *cd, void *data, int len)
{
struct msg_vec *mvec = snd_msg_vec(cd);
if (mvec->iov_len == mvec->vec_size) {
mvec->vec_size <<= 1;
mvec->iov = realloc(mvec->iov, mvec->vec_size * sizeof(struct iovec));
}
mvec->iov[mvec->iov_len].iov_base = data;
mvec->iov[mvec->iov_len].iov_len = len;
mvec->iov_len++;
}
/*
* Queue a send based on the data received in this cqe, which came from
* a completed receive operation.
*/
static void send_append(struct conn *c, struct conn_dir *cd, void *data,
int bid, int len)
{
vlog("%d: send %d (%p, bid %d)\n", c->tid, len, data, bid);
assert(bid < nr_bufs);
/* if using provided buffers for send, add it upfront */
if (send_ring) {
struct conn_buf_ring *cbr = &c->out_br;
io_uring_buf_ring_add(cbr->br, data, len, bid, br_mask, 0);
io_uring_buf_ring_advance(cbr->br, 1);
} else {
send_append_vec(cd, data, len);
}
}
/*
* For non recvmsg && multishot, a zero receive marks the end. For recvmsg
* with multishot, we always get the header regardless. Hence a "zero receive"
* is the size of the header.
*/
static int recv_done_res(int res)
{
if (!res)
return 1;
if (rcv_msg && recv_mshot && res == sizeof(struct io_uring_recvmsg_out))
return 1;
return 0;
}
static int recv_bids(struct conn *c, struct conn_dir *cd, int *bid, int in_bytes)
{
struct conn_buf_ring *cbr = &c->out_br;
struct conn_buf_ring *in_cbr = &c->in_br;
struct io_uring_buf *buf;
int nr_packets = 0;
while (in_bytes) {
int this_bytes;
void *data;
buf = &in_cbr->br->bufs[*bid];
data = (void *) (unsigned long) buf->addr;
this_bytes = buf->len;
if (this_bytes > in_bytes)
this_bytes = in_bytes;
in_bytes -= this_bytes;
if (send_ring)
io_uring_buf_ring_add(cbr->br, data, this_bytes, *bid,
br_mask, nr_packets);
else
send_append(c, cd, data, *bid, this_bytes);
*bid = (*bid + 1) & (nr_bufs - 1);
nr_packets++;
}
if (send_ring)
io_uring_buf_ring_advance(cbr->br, nr_packets);
return nr_packets;
}
static int recv_mshot_msg(struct conn *c, struct conn_dir *cd, int *bid,
int in_bytes)
{
struct conn_buf_ring *cbr = &c->out_br;
struct conn_buf_ring *in_cbr = &c->in_br;
struct io_uring_buf *buf;
int nr_packets = 0;
while (in_bytes) {
struct io_uring_recvmsg_out *pdu;
int this_bytes;
void *data;
buf = &in_cbr->br->bufs[*bid];
/*
* multishot recvmsg puts a header in front of the data - we
* have to take that into account for the send setup, and
* adjust the actual data read to not take this metadata into
* account. For this use case, namelen and controllen will not
* be set. If they were, they would need to be factored in too.
*/
buf->len -= sizeof(struct io_uring_recvmsg_out);
in_bytes -= sizeof(struct io_uring_recvmsg_out);
pdu = (void *) (unsigned long) buf->addr;
vlog("pdu namelen %d, controllen %d, payload %d flags %x\n",
pdu->namelen, pdu->controllen, pdu->payloadlen,
pdu->flags);
data = (void *) (pdu + 1);
this_bytes = pdu->payloadlen;
if (this_bytes > in_bytes)
this_bytes = in_bytes;
in_bytes -= this_bytes;
if (send_ring)
io_uring_buf_ring_add(cbr->br, data, this_bytes, *bid,
br_mask, nr_packets);
else
send_append(c, cd, data, *bid, this_bytes);
*bid = (*bid + 1) & (nr_bufs - 1);
nr_packets++;
}
if (send_ring)
io_uring_buf_ring_advance(cbr->br, nr_packets);
return nr_packets;
}
static int __handle_recv(struct io_uring *ring, struct conn *c,
struct conn_dir *cd, struct io_uring_cqe *cqe)
{
struct conn_dir *ocd = &c->cd[!cd->index];
int bid, nr_packets;
/*
* Not having a buffer attached should only happen if we get a zero
* sized receive, because the other end closed the connection. It
* cannot happen otherwise, as all our receives are using provided
* buffers and hence it's not possible to return a CQE with a non-zero
* result and not have a buffer attached.
*/
if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
cd->pending_recv = 0;
if (!recv_done_res(cqe->res)) {
fprintf(stderr, "no buffer assigned, res=%d\n", cqe->res);
return 1;
}
start_close:
prep_next_send(ring, c, ocd, other_dir_fd(c, cqe_to_fd(cqe)));
close_cd(c, cd);
return 0;
}
if (cqe->res && cqe->res < buf_size)
cd->rcv_shrt++;
bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
/*
* BIDI will use the same buffer pool and do receive on both CDs,
* so can't reliably check. TODO.
*/
if (!bidi && cd->rcv_next_bid != -1 && bid != cd->rcv_next_bid) {
fprintf(stderr, "recv bid %d, wanted %d\n", bid, cd->rcv_next_bid);
goto start_close;
}
vlog("%d: recv: bid=%d, res=%d, cflags=%x\n", c->tid, bid, cqe->res, cqe->flags);
assert(cqe->res);
/*
* If we're a sink, we're done here. Just replenish the buffer back
* to the pool. For proxy mode, we will send the data to the other
* end and the buffer will be replenished once the send is done with
* it.
*/
if (is_sink)
nr_packets = replenish_buffers(c, &bid, cqe->res);
else if (rcv_msg && recv_mshot)
nr_packets = recv_mshot_msg(c, ocd, &bid, cqe->res);
else
nr_packets = recv_bids(c, ocd, &bid, cqe->res);
ocd->out_buffers += nr_packets;
assert(ocd->out_buffers <= nr_bufs);
cd->rcv++;
cd->rcv_next_bid = bid;
/*
* If IORING_CQE_F_MORE isn't set, then this is either a normal recv
* that needs rearming, or it's a multishot that won't post any further
* completions. Setup a new one for these cases.
*/
if (!(cqe->flags & IORING_CQE_F_MORE)) {
cd->pending_recv = 0;
if (recv_done_res(cqe->res))
goto start_close;
if (!is_sink)
cd->rcv_need_rearm = 1;
else
cd->rcv_rearm = 1;
}
/*
* Submit a send if we won't get anymore notifications from this
* recv, or if we have nr_bufs / 2 queued up. If BIDI mode, send
* every buffer. We assume this is interactive mode, and hence don't
* delay anything.
*/
if ((!ocd->pending_send && (bidi || (ocd->out_buffers >= nr_bufs / 2))) ||
!(cqe->flags & IORING_CQE_F_MORE))
prep_next_send(ring, c, ocd, other_dir_fd(c, cqe_to_fd(cqe)));
if (!recv_done_res(cqe->res))
cd->in_bytes += cqe->res;
return 0;
}
static int handle_recv(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
return __handle_recv(ring, c, cd, cqe);
}
static int recv_error(struct error_handler *err, struct io_uring *ring,
struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
cd->pending_recv = 0;
if (cqe->res != -ENOBUFS)
return default_error(err, ring, cqe);
recv_enobufs(ring, c, cd, other_dir_fd(c, cqe_to_fd(cqe)));
return 0;
}
static void submit_send(struct io_uring *ring, struct conn *c,
struct conn_dir *cd, int fd, void *data, int len,
int bid)
{
struct io_uring_sqe *sqe;
int bgid = c->out_br.bgid;
if (cd->pending_send)
return;
cd->pending_send = 1;
sqe = get_sqe(ring);
if (snd_msg) {
struct io_msg *imsg = &cd->io_snd_msg;
io_uring_prep_sendmsg(sqe, fd, &imsg->msg, MSG_WAITALL|MSG_NOSIGNAL);
} else if (send_ring) {
io_uring_prep_send(sqe, fd, NULL, 0, MSG_WAITALL|MSG_NOSIGNAL);
} else {
io_uring_prep_send(sqe, fd, data, len, MSG_WAITALL|MSG_NOSIGNAL);
}
encode_userdata(sqe, c, __SEND, bid, fd);
if (fixed_files)
sqe->flags |= IOSQE_FIXED_FILE;
if (send_ring) {
sqe->flags |= IOSQE_BUFFER_SELECT;
sqe->buf_group = bgid;
}
if (snd_bundle) {
sqe->ioprio |= IORING_RECVSEND_BUNDLE;
cd->snd_mshot++;
} else if (send_ring)
cd->snd_mshot++;
}
static void prep_next_send(struct io_uring *ring, struct conn *c,
struct conn_dir *cd, int fd)
{
int bid;
if (cd->pending_send || is_sink || !cd->out_buffers)
return;
bid = cd->snd_next_bid;
if (bid == -1)
bid = 0;
if (send_ring) {
submit_send(ring, c, cd, fd, NULL, 0, bid);
} else if (snd_msg) {
struct io_msg *imsg = &cd->io_snd_msg;
if (!msg_vec(imsg)->iov_len)
return;
assert(msg_vec(imsg)->iov_len);
imsg->msg.msg_iov = msg_vec(imsg)->iov;
imsg->msg.msg_iovlen = msg_vec(imsg)->iov_len;
msg_vec(imsg)->iov_len = 0;
imsg->vec_index = !imsg->vec_index;
submit_send(ring, c, cd, fd, NULL, 0, bid);
} else {
struct io_msg *imsg = &cd->io_snd_msg;
struct msg_vec *mvec = msg_vec(imsg);
struct iovec *iov;
if (mvec->iov_len == mvec->cur_iov)
return;
imsg->msg.msg_iov = msg_vec(imsg)->iov;
iov = &mvec->iov[mvec->cur_iov];
mvec->cur_iov++;
if (mvec->cur_iov == mvec->iov_len) {
mvec->iov_len = 0;
mvec->cur_iov = 0;
imsg->vec_index = !imsg->vec_index;
}
submit_send(ring, c, cd, fd, iov->iov_base, iov->iov_len, bid);
}
}
/*
* Handling a send with an outgoing send ring. Get the buffers from the
* receive side, and add them to the ingoing buffer ring again.
*/
static int handle_send_ring(struct conn *c, struct conn_dir *cd,
int bid, int bytes)
{
struct conn_buf_ring *in_cbr = &c->in_br;
struct conn_buf_ring *out_cbr = &c->out_br;
int i = 0;
while (bytes) {
struct io_uring_buf *buf = &out_cbr->br->bufs[bid];
int this_bytes;
void *this_buf;
this_bytes = buf->len;
if (this_bytes > bytes)
this_bytes = bytes;
cd->out_bytes += this_bytes;
vlog("%d: send: bid=%d, len=%d\n", c->tid, bid, this_bytes);
this_buf = in_cbr->buf + bid * buf_size;
io_uring_buf_ring_add(in_cbr->br, this_buf, buf_size, bid, br_mask, i);
/*
* Find the provided buffer that the receive consumed, and
* which we then used for the send, and add it back to the
* pool so it can get picked by another receive. Once the send
* is done, we're done with it.
*/
bid = (bid + 1) & (nr_bufs - 1);
bytes -= this_bytes;
i++;
}
cd->snd_next_bid = bid;
io_uring_buf_ring_advance(in_cbr->br, i);
if (pending_shutdown(c))
close_cd(c, cd);
return i;
}
/*
* sendmsg, or send without a ring. Just add buffers back to the ingoing
* ring for receives.
*/
static int handle_send_buf(struct conn *c, struct conn_dir *cd, int bid,
int bytes)
{
struct conn_buf_ring *in_cbr = &c->in_br;
int i = 0;
while (bytes) {
struct io_uring_buf *buf = &in_cbr->br->bufs[bid];
int this_bytes;
this_bytes = bytes;
if (this_bytes > buf->len)
this_bytes = buf->len;
vlog("%d: send: bid=%d, len=%d\n", c->tid, bid, this_bytes);
cd->out_bytes += this_bytes;
/* each recvmsg mshot package has this overhead */
if (rcv_msg && recv_mshot)
cd->out_bytes += sizeof(struct io_uring_recvmsg_out);
replenish_buffer(in_cbr, bid, i);
bid = (bid + 1) & (nr_bufs - 1);
bytes -= this_bytes;
i++;
}
io_uring_buf_ring_advance(in_cbr->br, i);
cd->snd_next_bid = bid;
return i;
}
static int __handle_send(struct io_uring *ring, struct conn *c,
struct conn_dir *cd, struct io_uring_cqe *cqe)
{
struct conn_dir *ocd;
int bid, nr_packets;
if (send_ring) {
if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
fprintf(stderr, "no buffer in send?! %d\n", cqe->res);
return 1;
}
bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
} else {
bid = cqe_to_bid(cqe);
}
if (cqe->res && cqe->res < buf_size)
cd->snd_shrt++;
/*
* BIDI will use the same buffer pool and do sends on both CDs,
* so can't reliably check. TODO.
*/
if (!bidi && send_ring && cd->snd_next_bid != -1 && bid != cd->snd_next_bid) {
fprintf(stderr, "send bid %d, wanted %d at %lu\n", bid,
cd->snd_next_bid, cd->out_bytes);
goto out_close;
}
assert(bid <= nr_bufs);
vlog("send: got %d, %lu\n", cqe->res, cd->out_bytes);
if (send_ring)
nr_packets = handle_send_ring(c, cd, bid, cqe->res);
else
nr_packets = handle_send_buf(c, cd, bid, cqe->res);
cd->out_buffers -= nr_packets;
assert(cd->out_buffers >= 0);
cd->snd++;
ocd = &c->cd[!cd->index];
if (!ocd->pending_recv) {
int fd = other_dir_fd(c, cqe_to_fd(cqe));
__submit_receive(ring, c, ocd, fd);
}
if (!(cqe->flags & IORING_CQE_F_MORE)) {
cd->pending_send = 0;
/*
* send done - see if the current vec has data to submit, and
* do so if it does. if it doesn't have data yet, nothing to
* do.
*/
prep_next_send(ring, c, cd, cqe_to_fd(cqe));
out_close:
if (pending_shutdown(c))
close_cd(c, cd);
}
vlog("%d: pending sends %d\n", c->tid, cd->pending_send);
return 0;
}
static int handle_send(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
return __handle_send(ring, c, cd, cqe);
}
static int send_error(struct error_handler *err, struct io_uring *ring,
struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
struct conn_dir *ocd = &c->cd[!cd->index];
cd->pending_send = 0;
if (cqe->res != -ENOBUFS)
return default_error(err, ring, cqe);
ocd->rcv_rearm = 1;
cd->snd_enobufs++;
return 0;
}
/*
* We don't expect to get here, as we marked it with skipping posting a
* CQE if it was successful. If it does trigger, than means it fails and
* that our close has not been done. Log the shutdown error and issue a new
* separate close.
*/
static int handle_shutdown(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
struct io_uring_sqe *sqe;
int fd = cqe_to_fd(cqe);
fprintf(stderr, "Got shutdown notication on fd %d\n", fd);
if (!cqe->res)
fprintf(stderr, "Unexpected success shutdown CQE\n");
else if (cqe->res < 0)
fprintf(stderr, "Shutdown got %s\n", strerror(-cqe->res));
sqe = get_sqe(ring);
if (fixed_files)
io_uring_prep_close_direct(sqe, fd);
else
io_uring_prep_close(sqe, fd);
encode_userdata(sqe, c, __CLOSE, 0, fd);
return 0;
}
/*
* Final stage of a connection, the shutdown and close has finished. Mark
* it as disconnected and let the main loop reap it.
*/
static int handle_close(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
int fd = cqe_to_fd(cqe);
c->flags |= CONN_F_DISCONNECTED;
printf("Closed client: id=%d, in_fd=%d, out_fd=%d\n", c->tid, c->in_fd, c->out_fd);
if (fd == c->in_fd)
c->in_fd = -1;
else if (fd == c->out_fd)
c->out_fd = -1;
if (c->in_fd == -1 && c->out_fd == -1) {
__show_stats(c);
open_conns--;
free_buffer_rings(ring, c);
free_msgs(&c->cd[0]);
free_msgs(&c->cd[1]);
}
return 0;
}
static int handle_cancel(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
int fd = cqe_to_fd(cqe);
c->pending_cancels--;
vlog("%d: got cancel fd %d, refs %d\n", c->tid, fd, c->pending_cancels);
if (!c->pending_cancels) {
queue_shutdown_close(ring, c, c->in_fd);
if (c->out_fd != -1)
queue_shutdown_close(ring, c, c->out_fd);
io_uring_submit(ring);
}
return 0;
}
/*
* Called for each CQE that we receive. Decode the request type that it
* came from, and call the appropriate handler.
*/
static int handle_cqe(struct io_uring *ring, struct io_uring_cqe *cqe)
{
int ret;
/*
* Unlikely, but there's an error in this CQE. If an error handler
* is defined, call it, and that will deal with it. If no error
* handler is defined, the opcode handler either doesn't care or will
* handle it on its own.
*/
if (cqe->res < 0) {
struct error_handler *err = &error_handlers[cqe_to_op(cqe)];
if (err->error_fn)
return err->error_fn(err, ring, cqe);
}
switch (cqe_to_op(cqe)) {
case __ACCEPT:
ret = handle_accept(ring, cqe);
break;
case __SOCK:
ret = handle_sock(ring, cqe);
break;
case __CONNECT:
ret = handle_connect(ring, cqe);
break;
case __RECV:
ret = handle_recv(ring, cqe);
break;
case __SEND:
ret = handle_send(ring, cqe);
break;
case __CANCEL:
ret = handle_cancel(ring, cqe);
break;
case __SHUTDOWN:
ret = handle_shutdown(ring, cqe);
break;
case __CLOSE:
ret = handle_close(ring, cqe);
break;
default:
fprintf(stderr, "bad user data %lx\n", (long) cqe->user_data);
return 1;
}
return ret;
}
static void usage(const char *name)
{
printf("%s:\n", name);
printf("\t-m:\t\tUse multishot receive (%d)\n", recv_mshot);
printf("\t-d:\t\tUse DEFER_TASKRUN (%d)\n", defer_tw);
printf("\t-S:\t\tUse SQPOLL (%d)\n", sqpoll);
printf("\t-f:\t\tUse only fixed files (%d)\n", fixed_files);
printf("\t-t:\t\tTimeout for waiting on CQEs (usec) (%d)\n", wait_usec);
printf("\t-w:\t\tNumber of CQEs to wait for each loop (%d)\n", wait_batch);
printf("\t-B:\t\tUse bi-directional mode (%d)\n", bidi);
printf("\t-s:\t\tAct only as a sink (%d)\n", is_sink);
printf("\t-q:\t\tRing size to use (%d)\n", ring_size);
printf("\t-H:\t\tHost to connect to (%s)\n", host);
printf("\t-r:\t\tPort to receive on (%d)\n", receive_port);
printf("\t-p:\t\tPort to connect to (%d)\n", send_port);
printf("\t-6:\t\tUse IPv6 (%d)\n", ipv6);
printf("\t-N:\t\tUse NAPI polling (%d)\n", napi);
printf("\t-T:\t\tNAPI timeout (usec) (%d)\n", napi_timeout);
printf("\t-b:\t\tSend/receive buf size (%d)\n", buf_size);
printf("\t-n:\t\tNumber of provided buffers (pow2) (%d)\n", nr_bufs);
printf("\t-u:\t\tUse provided buffers for send (%d)\n", send_ring);
printf("\t-C:\t\tUse bundles for send (%d)\n", snd_bundle);
printf("\t-c:\t\tUse bundles for recv (%d)\n", snd_bundle);
printf("\t-M:\t\tUse sendmsg (%d)\n", snd_msg);
printf("\t-M:\t\tUse recvmsg (%d)\n", rcv_msg);
printf("\t-V:\t\tIncrease verbosity (%d)\n", verbose);
}
static void house_keeping(struct io_uring *ring)
{
struct conn_dir *cd;
struct conn *c;
int i, j;
vlog("House keeping entered\n");
for (i = 0; i < nr_conns; i++) {
c = &conns[i];
if (c->flags & CONN_F_DISCONNECTED) {
vlog("%d: disconnected\n", i);
continue;
}
for (j = 0; j < 2; j++) {
int in_fd;
cd = &c->cd[j];
if (!j)
in_fd = c->in_fd;
else
in_fd = c->out_fd;
if (cd->rcv_rearm) {
vlog("%d: rcv rearm on %d\n", i, j);
cd->rcv_rearm = 0;
if (!cd->pending_recv)
__submit_receive(ring, c, cd, in_fd);
}
}
if (c->flags & CONN_F_DISCONNECTING)
continue;
if (should_shutdown(c)) {
__close_conn(ring, c);
c->flags |= CONN_F_DISCONNECTING;
}
}
}
/*
* Main event loop, Submit our multishot accept request, and then just loop
* around handling incoming events.
*/
static int event_loop(struct io_uring *ring, int fd)
{
struct __kernel_timespec active_ts, idle_ts = { .tv_sec = 1, };
struct io_uring_sqe *sqe;
int flags;
/*
* proxy provides a way to use either multishot receive or not, but
* for accept, we always use multishot. A multishot accept request
* needs only be armed once, and then it'll trigger a completion and
* post a CQE whenever a new connection is accepted. No need to do
* anything else, unless the multishot accept terminates. This happens
* if it encounters an error. Applications should check for
* IORING_CQE_F_MORE in cqe->flags - this tells you if more completions
* are expected from this request or not. Non-multishot never have
* this set, where multishot will always have this set unless an error
* occurs.
*/
sqe = get_sqe(ring);
if (fixed_files)
io_uring_prep_multishot_accept_direct(sqe, fd, NULL, NULL, 0);
else
io_uring_prep_multishot_accept(sqe, fd, NULL, NULL, 0);
__encode_userdata(sqe, 0, __ACCEPT, 0, fd);
active_ts = idle_ts;
if (wait_usec > 1000000) {
active_ts.tv_sec = wait_usec / 1000000;
wait_usec -= active_ts.tv_sec * 1000000;
}
active_ts.tv_nsec = wait_usec * 1000;
flags = 0;
while (1) {
struct __kernel_timespec *ts = &idle_ts;
struct io_uring_cqe *cqe;
unsigned int head;
int ret, i, to_wait;
/*
* If wait_batch is set higher than 1, then we'll wait on
* that amount of CQEs to be posted each loop. If used with
* DEFER_TASKRUN, this can provide a substantial reduction
* in context switch rate as the task isn't woken until the
* requested number of events can be returned.
*
* Can be used with -t to set a wait_usec timeout as well.
* For example, if an application can deal with 250 usec
* of wait latencies, it can set -w8 -t250 which will cause
* io_uring to return when either 8 events have been received,
* or if 250 usec of waiting has passed.
*
* If we don't have any open connections, wait on just 1
* always.
*/
to_wait = 1;
if (open_conns && !flags) {
ts = &active_ts;
to_wait = open_conns * wait_batch;
}
vlog("Submit and wait for %d\n", to_wait);
ret = io_uring_submit_and_wait_timeout(ring, &cqe, to_wait, ts, NULL);
vlog("Submit and wait: %d\n", ret);
i = flags = 0;
io_uring_for_each_cqe(ring, head, cqe) {
if (handle_cqe(ring, cqe))
return 1;
flags |= cqe_to_conn(cqe)->flags;
++i;
}
vlog("Handled %d events\n", i);
/*
* Advance the CQ ring for seen events when we've processed
* all of them in this loop. This can also be done with
* io_uring_cqe_seen() in each handler above, which just marks
* that single CQE as seen. However, it's more efficient to
* mark a batch as seen when we're done with that batch.
*/
if (i) {
io_uring_cq_advance(ring, i);
events += i;
}
house_keeping(ring);
event_loops++;
}
return 0;
}
/*
* Options parsing the ring / net setup
*/
int main(int argc, char *argv[])
{
struct io_uring ring;
struct io_uring_params params;
struct sigaction sa = { };
const char *optstring = "m:d:S:s:b:f:H:r:p:n:B:N:T:w:t:M:R:u:c:C:q:6Vh?";
int opt, ret, fd;
page_size = sysconf(_SC_PAGESIZE);
if (page_size < 0) {
perror("sysconf(_SC_PAGESIZE)");
return 1;
}
while ((opt = getopt(argc, argv, optstring)) != -1) {
switch (opt) {
case 'm':
recv_mshot = !!atoi(optarg);
break;
case 'S':
sqpoll = !!atoi(optarg);
break;
case 'd':
defer_tw = !!atoi(optarg);
break;
case 'b':
buf_size = atoi(optarg);
break;
case 'n':
nr_bufs = atoi(optarg);
break;
case 'u':
send_ring = !!atoi(optarg);
break;
case 'c':
rcv_bundle = !!atoi(optarg);
break;
case 'C':
snd_bundle = !!atoi(optarg);
break;
case 'w':
wait_batch = atoi(optarg);
break;
case 't':
wait_usec = atoi(optarg);
break;
case 's':
is_sink = !!atoi(optarg);
break;
case 'f':
fixed_files = !!atoi(optarg);
break;
case 'H':
host = strdup(optarg);
break;
case 'r':
receive_port = atoi(optarg);
break;
case 'p':
send_port = atoi(optarg);
break;
case 'B':
bidi = !!atoi(optarg);
break;
case 'N':
napi = !!atoi(optarg);
break;
case 'T':
napi_timeout = atoi(optarg);
break;
case '6':
ipv6 = true;
break;
case 'M':
snd_msg = !!atoi(optarg);
break;
case 'R':
rcv_msg = !!atoi(optarg);
break;
case 'q':
ring_size = atoi(optarg);
break;
case 'V':
verbose++;
break;
case 'h':
default:
usage(argv[0]);
return 1;
}
}
if (bidi && is_sink) {
fprintf(stderr, "Can't be both bidi proxy and sink\n");
return 1;
}
if (snd_msg && sqpoll) {
fprintf(stderr, "SQPOLL with msg variants disabled\n");
snd_msg = 0;
}
if (rcv_msg && rcv_bundle) {
fprintf(stderr, "Can't use bundles with recvmsg\n");
rcv_msg = 0;
}
if (snd_msg && snd_bundle) {
fprintf(stderr, "Can't use bundles with recvmsg\n");
snd_msg = 0;
}
if (snd_msg && send_ring) {
fprintf(stderr, "Can't use send ring sendmsg\n");
snd_msg = 0;
}
/*
* For recvmsg w/multishot, we waste some data at the head of the
* packet every time. Adjust the buffer size to account for that,
* so we're still handing 'buf_size' actual payload of data.
*/
if (rcv_msg && recv_mshot) {
fprintf(stderr, "Adjusted buf size for recvmsg w/multishot\n");
buf_size += sizeof(struct io_uring_recvmsg_out);
}
br_mask = nr_bufs - 1;
fd = setup_listening_socket(receive_port, ipv6);
if (is_sink)
send_port = -1;
if (fd == -1)
return 1;
atexit(show_stats);
sa.sa_handler = sig_int;
sa.sa_flags = SA_RESTART;
sigaction(SIGINT, &sa, NULL);
/*
* By default, set us up with a big CQ ring. Not strictly needed
* here, but it's very important to never overflow the CQ ring.
* Events will not be dropped if this happens, but it does slow
* the application down in dealing with overflown events.
*
* Set SINGLE_ISSUER, which tells the kernel that only one thread
* is doing IO submissions. This enables certain optimizations in
* the kernel.
*/
memset(&params, 0, sizeof(params));
params.flags |= IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_CLAMP;
params.flags |= IORING_SETUP_CQSIZE;
params.cq_entries = 131072;
/*
* DEFER_TASKRUN decouples async event reaping and retrying from
* regular system calls. If this isn't set, then io_uring uses
* normal task_work for this. task_work is always being run on any
* exit to userspace. Real applications do more than just call IO
* related system calls, and hence we can be running this work way
* too often. Using DEFER_TASKRUN defers any task_work running to
* when the application enters the kernel anyway to wait on new
* events. It's generally the preferred and recommended way to setup
* a ring.
*/
if (defer_tw) {
params.flags |= IORING_SETUP_DEFER_TASKRUN;
sqpoll = 0;
}
/*
* SQPOLL offloads any request submission and retry operations to a
* dedicated thread. This enables an application to do IO without
* ever having to enter the kernel itself. The SQPOLL thread will
* stay busy as long as there's work to do, and go to sleep if
* sq_thread_idle msecs have passed. If it's running, submitting new
* IO just needs to make them visible to the SQPOLL thread, it needs
* not enter the kernel. For submission, the application will only
* enter the kernel if the SQPOLL has been idle long enough that it
* has gone to sleep.
*
* Waiting on events still need to enter the kernel, if none are
* available. The application may also use io_uring_peek_cqe() to
* check for new events without entering the kernel, as completions
* will be continually produced to the CQ ring by the SQPOLL thread
* as they occur.
*/
if (sqpoll) {
params.flags |= IORING_SETUP_SQPOLL;
params.sq_thread_idle = 1000;
defer_tw = 0;
}
/*
* If neither DEFER_TASKRUN or SQPOLL is used, set COOP_TASKRUN. This
* avoids heavy signal based notifications, which can force an
* application to enter the kernel and process it as soon as they
* occur.
*/
if (!sqpoll && !defer_tw)
params.flags |= IORING_SETUP_COOP_TASKRUN;
/*
* The SQ ring size need not be larger than any batch of requests
* that need to be prepared before submit. Normally in a loop we'd
* only need a few, if any, particularly if multishot is used.
*/
ret = io_uring_queue_init_params(ring_size, &ring, &params);
if (ret) {
fprintf(stderr, "%s\n", strerror(-ret));
return 1;
}
/*
* If send serialization is available and no option was given to use
* it or not, default it to on. If it was turned on and the kernel
* doesn't support it, turn it off.
*/
if (params.features & IORING_FEAT_SEND_BUF_SELECT) {
if (send_ring == -1)
send_ring = 1;
} else {
if (send_ring == 1) {
fprintf(stderr, "Kernel doesn't support ring provided "
"buffers for sends, disabled\n");
}
send_ring = 0;
}
if (!send_ring && snd_bundle) {
fprintf(stderr, "Can't use send bundle without send_ring\n");
snd_bundle = 0;
}
if (fixed_files) {
/*
* If fixed files are used, we need to allocate a fixed file
* table upfront where new direct descriptors can be managed.
*/
ret = io_uring_register_files_sparse(&ring, 4096);
if (ret) {
fprintf(stderr, "file register: %d\n", ret);
return 1;
}
/*
* If fixed files are used, we also register the ring fd. See
* comment near io_uring_prep_socket_direct_alloc() further
* down. This avoids the fget/fput overhead associated with
* the io_uring_enter(2) system call itself, which is used to
* submit and wait on events.
*/
ret = io_uring_register_ring_fd(&ring);
if (ret != 1) {
fprintf(stderr, "ring register: %d\n", ret);
return 1;
}
}
if (napi) {
struct io_uring_napi n = {
.prefer_busy_poll = napi > 1 ? 1 : 0,
.busy_poll_to = napi_timeout,
};
ret = io_uring_register_napi(&ring, &n);
if (ret) {
fprintf(stderr, "io_uring_register_napi: %d\n", ret);
if (ret != -EINVAL)
return 1;
fprintf(stderr, "NAPI not available, turned off\n");
}
}
printf("Backend: sqpoll=%d, defer_tw=%d, fixed_files=%d "
"is_sink=%d, buf_size=%d, nr_bufs=%d, host=%s, send_port=%d "
"receive_port=%d, napi=%d, napi_timeout=%d\n",
sqpoll, defer_tw, fixed_files, is_sink,
buf_size, nr_bufs, host, send_port, receive_port,
napi, napi_timeout);
printf(" recv options: recvmsg=%d, recv_mshot=%d, recv_bundle=%d\n",
rcv_msg, recv_mshot, rcv_bundle);
printf(" send options: sendmsg=%d, send_ring=%d, send_bundle=%d\n",
snd_msg, send_ring, snd_bundle);
return event_loop(&ring, fd);
}