engines/rbd: sort IO by start time before waiting on it

Instead of waiting on the first random IO we find, wait on the
oldest one. For higher queue depth, this should yield better
performance and lower latencies.

Signed-off-by: Jens Axboe <axboe@fb.com>
diff --git a/engines/rbd.c b/engines/rbd.c
index 52fc8c9..f0a0e5b 100644
--- a/engines/rbd.c
+++ b/engines/rbd.c
@@ -21,6 +21,7 @@
 	rados_ioctx_t io_ctx;
 	rbd_image_t image;
 	struct io_u **aio_events;
+	struct io_u **sort_events;
 };
 
 struct rbd_options {
@@ -77,67 +78,64 @@
 static int _fio_setup_rbd_data(struct thread_data *td,
 			       struct rbd_data **rbd_data_ptr)
 {
-	struct rbd_data *rbd_data;
+	struct rbd_data *rbd;
 
 	if (td->io_ops->data)
 		return 0;
 
-	rbd_data = malloc(sizeof(struct rbd_data));
-	if (!rbd_data)
+	rbd = calloc(1, sizeof(struct rbd_data));
+	if (!rbd)
 		goto failed;
 
-	memset(rbd_data, 0, sizeof(struct rbd_data));
-
-	rbd_data->aio_events = malloc(td->o.iodepth * sizeof(struct io_u *));
-	if (!rbd_data->aio_events)
+	rbd->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
+	if (!rbd->aio_events)
 		goto failed;
 
-	memset(rbd_data->aio_events, 0, td->o.iodepth * sizeof(struct io_u *));
+	rbd->sort_events = calloc(td->o.iodepth, sizeof(struct io_u *));
+	if (!rbd->sort_events)
+		goto failed;
 
-	*rbd_data_ptr = rbd_data;
-
+	*rbd_data_ptr = rbd;
 	return 0;
 
 failed:
-	if (rbd_data)
-		free(rbd_data);
+	if (rbd)
+		free(rbd);
 	return 1;
 
 }
 
 static int _fio_rbd_connect(struct thread_data *td)
 {
-	struct rbd_data *rbd_data = td->io_ops->data;
+	struct rbd_data *rbd = td->io_ops->data;
 	struct rbd_options *o = td->eo;
 	int r;
 
-	r = rados_create(&rbd_data->cluster, o->client_name);
+	r = rados_create(&rbd->cluster, o->client_name);
 	if (r < 0) {
 		log_err("rados_create failed.\n");
 		goto failed_early;
 	}
 
-	r = rados_conf_read_file(rbd_data->cluster, NULL);
+	r = rados_conf_read_file(rbd->cluster, NULL);
 	if (r < 0) {
 		log_err("rados_conf_read_file failed.\n");
 		goto failed_early;
 	}
 
-	r = rados_connect(rbd_data->cluster);
+	r = rados_connect(rbd->cluster);
 	if (r < 0) {
 		log_err("rados_connect failed.\n");
 		goto failed_shutdown;
 	}
 
-	r = rados_ioctx_create(rbd_data->cluster, o->pool_name,
-			       &rbd_data->io_ctx);
+	r = rados_ioctx_create(rbd->cluster, o->pool_name, &rbd->io_ctx);
 	if (r < 0) {
 		log_err("rados_ioctx_create failed.\n");
 		goto failed_shutdown;
 	}
 
-	r = rbd_open(rbd_data->io_ctx, o->rbd_name, &rbd_data->image,
-		     NULL /*snap */ );
+	r = rbd_open(rbd->io_ctx, o->rbd_name, &rbd->image, NULL /*snap */ );
 	if (r < 0) {
 		log_err("rbd_open failed.\n");
 		goto failed_open;
@@ -145,34 +143,34 @@
 	return 0;
 
 failed_open:
-	rados_ioctx_destroy(rbd_data->io_ctx);
-	rbd_data->io_ctx = NULL;
+	rados_ioctx_destroy(rbd->io_ctx);
+	rbd->io_ctx = NULL;
 failed_shutdown:
-	rados_shutdown(rbd_data->cluster);
-	rbd_data->cluster = NULL;
+	rados_shutdown(rbd->cluster);
+	rbd->cluster = NULL;
 failed_early:
 	return 1;
 }
 
-static void _fio_rbd_disconnect(struct rbd_data *rbd_data)
+static void _fio_rbd_disconnect(struct rbd_data *rbd)
 {
-	if (!rbd_data)
+	if (!rbd)
 		return;
 
 	/* shutdown everything */
-	if (rbd_data->image) {
-		rbd_close(rbd_data->image);
-		rbd_data->image = NULL;
+	if (rbd->image) {
+		rbd_close(rbd->image);
+		rbd->image = NULL;
 	}
 
-	if (rbd_data->io_ctx) {
-		rados_ioctx_destroy(rbd_data->io_ctx);
-		rbd_data->io_ctx = NULL;
+	if (rbd->io_ctx) {
+		rados_ioctx_destroy(rbd->io_ctx);
+		rbd->io_ctx = NULL;
 	}
 
-	if (rbd_data->cluster) {
-		rados_shutdown(rbd_data->cluster);
-		rbd_data->cluster = NULL;
+	if (rbd->cluster) {
+		rados_shutdown(rbd->cluster);
+		rbd->cluster = NULL;
 	}
 }
 
@@ -199,20 +197,19 @@
 
 static struct io_u *fio_rbd_event(struct thread_data *td, int event)
 {
-	struct rbd_data *rbd_data = td->io_ops->data;
+	struct rbd_data *rbd = td->io_ops->data;
 
-	return rbd_data->aio_events[event];
+	return rbd->aio_events[event];
 }
 
-static inline int fri_check_complete(struct rbd_data *rbd_data,
-				     struct io_u *io_u,
+static inline int fri_check_complete(struct rbd_data *rbd, struct io_u *io_u,
 				     unsigned int *events)
 {
 	struct fio_rbd_iou *fri = io_u->engine_data;
 
 	if (fri->io_complete) {
 		fri->io_seen = 1;
-		rbd_data->aio_events[*events] = io_u;
+		rbd->aio_events[*events] = io_u;
 		(*events)++;
 
 		rbd_aio_release(fri->completion);
@@ -222,32 +219,88 @@
 	return 0;
 }
 
+static inline int rbd_io_u_seen(struct io_u *io_u)
+{
+	struct fio_rbd_iou *fri = io_u->engine_data;
+
+	return fri->io_seen;
+}
+
+static void rbd_io_u_wait_complete(struct io_u *io_u)
+{
+	struct fio_rbd_iou *fri = io_u->engine_data;
+
+	rbd_aio_wait_for_complete(fri->completion);
+}
+
+static int rbd_io_u_cmp(const void *p1, const void *p2)
+{
+	const struct io_u **a = (const struct io_u **) p1;
+	const struct io_u **b = (const struct io_u **) p2;
+	uint64_t at, bt;
+
+	at = utime_since_now(&(*a)->start_time);
+	bt = utime_since_now(&(*b)->start_time);
+
+	if (at < bt)
+		return -1;
+	else if (at == bt)
+		return 0;
+	else
+		return 1;
+}
+
 static int rbd_iter_events(struct thread_data *td, unsigned int *events,
 			   unsigned int min_evts, int wait)
 {
-	struct rbd_data *rbd_data = td->io_ops->data;
+	struct rbd_data *rbd = td->io_ops->data;
 	unsigned int this_events = 0;
 	struct io_u *io_u;
-	int i;
+	int i, sidx;
 
+	sidx = 0;
 	io_u_qiter(&td->io_u_all, io_u, i) {
-		struct fio_rbd_iou *fri = io_u->engine_data;
-
 		if (!(io_u->flags & IO_U_F_FLIGHT))
 			continue;
-		if (fri->io_seen)
+		if (rbd_io_u_seen(io_u))
 			continue;
 
-		if (fri_check_complete(rbd_data, io_u, events))
+		if (fri_check_complete(rbd, io_u, events))
 			this_events++;
-		else if (wait) {
-			rbd_aio_wait_for_complete(fri->completion);
+		else if (wait)
+			rbd->sort_events[sidx++] = io_u;
+	}
 
-			if (fri_check_complete(rbd_data, io_u, events))
-				this_events++;
+	if (!wait || !sidx)
+		return this_events;
+
+	/*
+	 * Sort events, oldest issue first, then wait on as many as we
+	 * need in order of age. If we have enough events, stop waiting,
+	 * and just check if any of the older ones are done.
+	 */
+	if (sidx > 1)
+		qsort(rbd->sort_events, sidx, sizeof(struct io_u *), rbd_io_u_cmp);
+
+	for (i = 0; i < sidx; i++) {
+		io_u = rbd->sort_events[i];
+
+		if (fri_check_complete(rbd, io_u, events)) {
+			this_events++;
+			continue;
 		}
+
+		/*
+		 * Stop waiting when we have enough, but continue checking
+		 * all pending IOs if they are complete.
+		 */
 		if (*events >= min_evts)
-			break;
+			continue;
+
+		rbd_io_u_wait_complete(io_u);
+
+		if (fri_check_complete(rbd, io_u, events))
+			this_events++;
 	}
 
 	return this_events;
@@ -279,7 +332,7 @@
 
 static int fio_rbd_queue(struct thread_data *td, struct io_u *io_u)
 {
-	struct rbd_data *rbd_data = td->io_ops->data;
+	struct rbd_data *rbd = td->io_ops->data;
 	struct fio_rbd_iou *fri = io_u->engine_data;
 	int r = -1;
 
@@ -296,32 +349,30 @@
 	}
 
 	if (io_u->ddir == DDIR_WRITE) {
-		r = rbd_aio_write(rbd_data->image, io_u->offset,
-				  io_u->xfer_buflen, io_u->xfer_buf,
-				  fri->completion);
+		r = rbd_aio_write(rbd->image, io_u->offset, io_u->xfer_buflen,
+					 io_u->xfer_buf, fri->completion);
 		if (r < 0) {
 			log_err("rbd_aio_write failed.\n");
 			goto failed_comp;
 		}
 
 	} else if (io_u->ddir == DDIR_READ) {
-		r = rbd_aio_read(rbd_data->image, io_u->offset,
-				 io_u->xfer_buflen, io_u->xfer_buf,
-				 fri->completion);
+		r = rbd_aio_read(rbd->image, io_u->offset, io_u->xfer_buflen,
+					io_u->xfer_buf, fri->completion);
 
 		if (r < 0) {
 			log_err("rbd_aio_read failed.\n");
 			goto failed_comp;
 		}
 	} else if (io_u->ddir == DDIR_TRIM) {
-		r = rbd_aio_discard(rbd_data->image, io_u->offset,
-				 io_u->xfer_buflen, fri->completion);
+		r = rbd_aio_discard(rbd->image, io_u->offset,
+					io_u->xfer_buflen, fri->completion);
 		if (r < 0) {
 			log_err("rbd_aio_discard failed.\n");
 			goto failed_comp;
 		}
 	} else if (io_u->ddir == DDIR_SYNC) {
-		r = rbd_aio_flush(rbd_data->image, fri->completion);
+		r = rbd_aio_flush(rbd->image, fri->completion);
 		if (r < 0) {
 			log_err("rbd_flush failed.\n");
 			goto failed_comp;
@@ -359,35 +410,35 @@
 
 static void fio_rbd_cleanup(struct thread_data *td)
 {
-	struct rbd_data *rbd_data = td->io_ops->data;
+	struct rbd_data *rbd = td->io_ops->data;
 
-	if (rbd_data) {
-		_fio_rbd_disconnect(rbd_data);
-		free(rbd_data->aio_events);
-		free(rbd_data);
+	if (rbd) {
+		_fio_rbd_disconnect(rbd);
+		free(rbd->aio_events);
+		free(rbd->sort_events);
+		free(rbd);
 	}
-
 }
 
 static int fio_rbd_setup(struct thread_data *td)
 {
-	int r = 0;
 	rbd_image_info_t info;
 	struct fio_file *f;
-	struct rbd_data *rbd_data = NULL;
+	struct rbd_data *rbd = NULL;
 	int major, minor, extra;
+	int r;
 
 	/* log version of librbd. No cluster connection required. */
 	rbd_version(&major, &minor, &extra);
 	log_info("rbd engine: RBD version: %d.%d.%d\n", major, minor, extra);
 
 	/* allocate engine specific structure to deal with librbd. */
-	r = _fio_setup_rbd_data(td, &rbd_data);
+	r = _fio_setup_rbd_data(td, &rbd);
 	if (r) {
 		log_err("fio_setup_rbd_data failed.\n");
 		goto cleanup;
 	}
-	td->io_ops->data = rbd_data;
+	td->io_ops->data = rbd;
 
 	/* librbd does not allow us to run first in the main thread and later
 	 * in a fork child. It needs to be the same process context all the
@@ -406,7 +457,7 @@
 	}
 
 	/* get size of the RADOS block device */
-	r = rbd_stat(rbd_data->image, &info, sizeof(info));
+	r = rbd_stat(rbd->image, &info, sizeof(info));
 	if (r < 0) {
 		log_err("rbd_status failed.\n");
 		goto disconnect;
@@ -428,11 +479,11 @@
 	/* disconnect, then we were only connected to determine
 	 * the size of the RBD.
 	 */
-	_fio_rbd_disconnect(rbd_data);
+	_fio_rbd_disconnect(rbd);
 	return 0;
 
 disconnect:
-	_fio_rbd_disconnect(rbd_data);
+	_fio_rbd_disconnect(rbd);
 cleanup:
 	fio_rbd_cleanup(td);
 	return r;
@@ -446,9 +497,9 @@
 static int fio_rbd_invalidate(struct thread_data *td, struct fio_file *f)
 {
 #if defined(CONFIG_RBD_INVAL)
-	struct rbd_data *rbd_data = td->io_ops->data;
+	struct rbd_data *rbd = td->io_ops->data;
 
-	return rbd_invalidate_cache(rbd_data->image);
+	return rbd_invalidate_cache(rbd->image);
 #else
 	return 0;
 #endif