decoders: fix display order of frames in non-blocking mode

Making the tests check the order of frames revealed that frames were
sometimes presented out-of-order when decoding in non-blocking mode,
i.e. when there can be more than one frame in the ready_queue of the
decoder.

The cause is that we test all the frames in the queue for readiness, and
return all those that pass the test. This leaves space for a race
condition where frame X is not ready at the time of testing, and frame
X + 1 becomes ready by the time it is tested. When this happens, frame
X + 1 is returned before frame X.

Fix this by stopping filtering on the first frame that signals it is not
ready. Since the decoder processes the jobs we give it in-order, this
guarantees that the frames will always be returned in the correct order.
diff --git a/src/decoders/h264/decoder.rs b/src/decoders/h264/decoder.rs
index 903bb87..e4c749b 100644
--- a/src/decoders/h264/decoder.rs
+++ b/src/decoders/h264/decoder.rs
@@ -1715,27 +1715,33 @@
 
     /// Get the DecodedFrameHandles for the pictures in the ready queue, in display order.
     fn get_ready_frames(&mut self) -> Vec<T> {
-        let (ready, retained): (Vec<_>, Vec<_>) = std::mem::take(&mut self.ready_queue)
-            // Unfortunately `BinaryHeap`'s `iter()` does not guarantee the order, so we
-            // need to convert into a vector first.
-            .into_sorted_vec()
+        // Unfortunately `BinaryHeap`'s `iter()` does not guarantee the order, so we
+        // need to convert into a vector first.
+        let mut ready_queue = std::mem::take(&mut self.ready_queue).into_sorted_vec();
+        ready_queue.reverse();
+
+        // Count all ready handles.
+        let num_ready = ready_queue
+            .iter()
+            .take_while(|&picture| self.backend.handle_is_ready(&picture.handle))
+            .count();
+
+        let retain = ready_queue.split_off(num_ready);
+        // `split_off` works the opposite way of what we would like, leaving [0..num_ready) in
+        // place, so we need to swap `retain` with `ready_queue`.
+        let ready = ready_queue
             .into_iter()
-            .rev()
-            // Assign display order to frames that don't have one yet.
             .map(|mut picture| {
                 if DecodedHandle::display_order(&picture.handle).is_none() {
-                    let order = self.current_display_order;
-                    picture.handle.set_display_order(order);
+                    picture.handle.set_display_order(self.current_display_order);
                     self.current_display_order += 1;
                 }
-                picture
+                picture.handle
             })
-            .partition(|picture| self.backend.handle_is_ready(&picture.handle));
+            .collect();
+        self.ready_queue = BinaryHeap::from(retain);
 
-        // Keep non-ready frames in the ready queue.
-        self.ready_queue = BinaryHeap::from(retained);
-
-        ready.into_iter().map(|picture| picture.handle).collect()
+        ready
     }
 
     /// Drain the decoder, processing all pending frames.
@@ -2305,7 +2311,7 @@
     }
 
     fn block_on_one(&mut self) -> VideoDecoderResult<()> {
-        for ReadyPicture { handle, .. } in &self.ready_queue {
+        if let Some(ReadyPicture { handle, .. }) = &self.ready_queue.peek() {
             if !self.backend.handle_is_ready(handle) {
                 return self
                     .backend
diff --git a/src/decoders/vp8/decoder.rs b/src/decoders/vp8/decoder.rs
index 6846c6b..524b326 100644
--- a/src/decoders/vp8/decoder.rs
+++ b/src/decoders/vp8/decoder.rs
@@ -163,10 +163,8 @@
     }
 
     fn block_on_one(&mut self) -> Result<()> {
-        for handle in &self.ready_queue {
-            if !self.backend.handle_is_ready(handle) {
-                return self.backend.block_on_handle(handle).map_err(|e| anyhow!(e));
-            }
+        if let Some(handle) = self.ready_queue.first() {
+            return self.backend.block_on_handle(handle).map_err(|e| anyhow!(e));
         }
 
         Ok(())
@@ -174,15 +172,18 @@
 
     /// Returns the ready handles.
     fn get_ready_frames(&mut self) -> Vec<T> {
-        let ready = self
+        // Count all ready handles.
+        let num_ready = self
             .ready_queue
             .iter()
-            .filter(|&handle| self.backend.handle_is_ready(handle))
-            .cloned()
-            .collect::<Vec<_>>();
+            .take_while(|&handle| self.backend.handle_is_ready(handle))
+            .count();
 
-        self.ready_queue
-            .retain(|handle| !self.backend.handle_is_ready(handle));
+        let retain = self.ready_queue.split_off(num_ready);
+        // `split_off` works the opposite way of what we would like, leaving [0..num_ready) in
+        // place, so we need to swap `retain` with `ready_queue`.
+        let ready = std::mem::take(&mut self.ready_queue);
+        self.ready_queue = retain;
 
         ready
     }
diff --git a/src/decoders/vp9/decoder.rs b/src/decoders/vp9/decoder.rs
index 0a35a9b..8e0c7c1 100644
--- a/src/decoders/vp9/decoder.rs
+++ b/src/decoders/vp9/decoder.rs
@@ -163,10 +163,8 @@
     }
 
     fn block_on_one(&mut self) -> Result<()> {
-        for handle in &self.ready_queue {
-            if !self.backend.handle_is_ready(handle) {
-                return self.backend.block_on_handle(handle).map_err(|e| anyhow!(e));
-            }
+        if let Some(handle) = self.ready_queue.first() {
+            return self.backend.block_on_handle(handle).map_err(|e| anyhow!(e));
         }
 
         Ok(())
@@ -174,15 +172,18 @@
 
     /// Returns the ready handles.
     fn get_ready_frames(&mut self) -> Vec<T> {
-        let ready = self
+        // Count all ready handles.
+        let num_ready = self
             .ready_queue
             .iter()
-            .filter(|&handle| self.backend.handle_is_ready(handle))
-            .cloned()
-            .collect::<Vec<_>>();
+            .take_while(|&handle| self.backend.handle_is_ready(handle))
+            .count();
 
-        self.ready_queue
-            .retain(|handle| !self.backend.handle_is_ready(handle));
+        let retain = self.ready_queue.split_off(num_ready);
+        // `split_off` works the opposite way of what we would like, leaving [0..num_ready) in
+        // place, so we need to swap `retain` with `ready_queue`.
+        let ready = std::mem::take(&mut self.ready_queue);
+        self.ready_queue = retain;
 
         ready
     }