Upgrade rust/crates/async-stream-impl to 0.3.1

Test: make
Change-Id: I2357f147474ec1da932b5c9eb11e3633c61bffbd
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
new file mode 100644
index 0000000..4a76e44
--- /dev/null
+++ b/.cargo_vcs_info.json
@@ -0,0 +1,5 @@
+{
+  "git": {
+    "sha1": "8c22349b9367b981ad08db573d8d906069e66d8c"
+  }
+}
diff --git a/Android.bp b/Android.bp
index a212b3b..839ba5f 100644
--- a/Android.bp
+++ b/Android.bp
@@ -1,4 +1,5 @@
 // This file is generated by cargo2android.py --run --dependencies --device.
+// Do not modify this file as changes will be overridden on upgrade.
 
 package {
     default_applicable_licenses: [
@@ -32,7 +33,7 @@
 }
 
 // dependent_library ["feature_list"]
-//   proc-macro2-1.0.24 "default,proc-macro"
-//   quote-1.0.8 "default,proc-macro"
-//   syn-1.0.60 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut"
-//   unicode-xid-0.2.1 "default"
+//   proc-macro2-1.0.26 "default,proc-macro"
+//   quote-1.0.9 "default,proc-macro"
+//   syn-1.0.72 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut"
+//   unicode-xid-0.2.2 "default"
diff --git a/Cargo.toml b/Cargo.toml
index df59eae..ca12ae9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,11 +13,11 @@
 [package]
 edition = "2018"
 name = "async-stream-impl"
-version = "0.3.0"
+version = "0.3.1"
 authors = ["Carl Lerche <me@carllerche.com>"]
 description = "proc macros for async-stream crate"
 homepage = "https://github.com/tokio-rs/async-stream"
-documentation = "https://docs.rs/async-stream-impl/0.3.0/async-stream-impl"
+documentation = "https://docs.rs/async-stream-impl/0.3"
 license = "MIT"
 repository = "https://github.com/tokio-rs/async-stream"
 
@@ -39,5 +39,5 @@
 version = "0.3"
 
 [dev-dependencies.tokio]
-version = "0.2"
+version = "1"
 features = ["full"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index dd6d9d9..6682921 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,11 +1,11 @@
 [package]
 name = "async-stream-impl"
-version = "0.3.0"
+version = "0.3.1"
 edition = "2018"
 license = "MIT"
 authors = ["Carl Lerche <me@carllerche.com>"]
 description = "proc macros for async-stream crate"
-documentation = "https://docs.rs/async-stream-impl/0.3.0/async-stream-impl"
+documentation = "https://docs.rs/async-stream-impl/0.3"
 homepage = "https://github.com/tokio-rs/async-stream"
 repository = "https://github.com/tokio-rs/async-stream"
 
@@ -18,7 +18,7 @@
 quote = "1"
 
 [dev-dependencies]
-# async-stream = { version = "0.3.0", path = "../async-stream" }
+async-stream = { path = "../async-stream" }
 futures-core = "0.3"
 futures-util = "0.3"
-tokio = { version = "0.2", features = ["full"] }
+tokio = { version = "1", features = ["full"] }
diff --git a/METADATA b/METADATA
index 8e2e26d..adebd05 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@
   }
   url {
     type: ARCHIVE
-    value: "https://static.crates.io/crates/async-stream-impl/async-stream-impl-0.3.0.crate"
+    value: "https://static.crates.io/crates/async-stream-impl/async-stream-impl-0.3.1.crate"
   }
-  version: "0.3.0"
+  version: "0.3.1"
   license_type: NOTICE
   last_upgrade_date {
     year: 2021
-    month: 2
-    day: 8
+    month: 5
+    day: 19
   }
 }
diff --git a/TEST_MAPPING b/TEST_MAPPING
index 8fccfe0..73ad094 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -2,16 +2,112 @@
 {
   "presubmit": [
     {
+      "name": "tokio-test_device_test_src_lib"
+    },
+    {
       "name": "tokio-test_device_test_tests_block_on"
     },
     {
       "name": "tokio-test_device_test_tests_io"
     },
     {
-      "name": "tokio-test_device_test_src_lib"
+      "name": "tokio-test_device_test_tests_macros"
     },
     {
-      "name": "tokio-test_device_test_tests_macros"
+      "name": "tokio_device_test_tests_buffered"
+    },
+    {
+      "name": "tokio_device_test_tests_io_async_read"
+    },
+    {
+      "name": "tokio_device_test_tests_io_copy_bidirectional"
+    },
+    {
+      "name": "tokio_device_test_tests_io_lines"
+    },
+    {
+      "name": "tokio_device_test_tests_io_mem_stream"
+    },
+    {
+      "name": "tokio_device_test_tests_io_read"
+    },
+    {
+      "name": "tokio_device_test_tests_io_read_buf"
+    },
+    {
+      "name": "tokio_device_test_tests_io_read_to_end"
+    },
+    {
+      "name": "tokio_device_test_tests_io_take"
+    },
+    {
+      "name": "tokio_device_test_tests_io_write"
+    },
+    {
+      "name": "tokio_device_test_tests_io_write_all"
+    },
+    {
+      "name": "tokio_device_test_tests_io_write_buf"
+    },
+    {
+      "name": "tokio_device_test_tests_io_write_int"
+    },
+    {
+      "name": "tokio_device_test_tests_macros_join"
+    },
+    {
+      "name": "tokio_device_test_tests_no_rt"
+    },
+    {
+      "name": "tokio_device_test_tests_rt_basic"
+    },
+    {
+      "name": "tokio_device_test_tests_rt_threaded"
+    },
+    {
+      "name": "tokio_device_test_tests_sync_barrier"
+    },
+    {
+      "name": "tokio_device_test_tests_sync_broadcast"
+    },
+    {
+      "name": "tokio_device_test_tests_sync_errors"
+    },
+    {
+      "name": "tokio_device_test_tests_sync_mpsc"
+    },
+    {
+      "name": "tokio_device_test_tests_sync_mutex_owned"
+    },
+    {
+      "name": "tokio_device_test_tests_sync_rwlock"
+    },
+    {
+      "name": "tokio_device_test_tests_sync_watch"
+    },
+    {
+      "name": "tokio_device_test_tests_task_local"
+    },
+    {
+      "name": "tokio_device_test_tests_task_local_set"
+    },
+    {
+      "name": "tokio_device_test_tests_tcp_accept"
+    },
+    {
+      "name": "tokio_device_test_tests_tcp_echo"
+    },
+    {
+      "name": "tokio_device_test_tests_tcp_into_std"
+    },
+    {
+      "name": "tokio_device_test_tests_tcp_shutdown"
+    },
+    {
+      "name": "tokio_device_test_tests_time_rt"
+    },
+    {
+      "name": "tokio_device_test_tests_uds_split"
     }
   ]
 }
diff --git a/src/lib.rs b/src/lib.rs
index 9081ecf..bcd7b3f 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,47 +1,52 @@
 extern crate proc_macro;
 use proc_macro::TokenStream;
-use proc_macro2::{Delimiter, Group, TokenStream as TokenStream2, TokenTree};
+use proc_macro2::{Group, TokenStream as TokenStream2, TokenTree};
 use quote::quote;
+use syn::parse::Parser;
 use syn::visit_mut::VisitMut;
 
-struct Scrub {
-    is_xforming: bool,
+struct Scrub<'a> {
+    /// Whether the stream is a try stream.
     is_try: bool,
+    /// The unit expression, `()`.
     unit: Box<syn::Expr>,
-    num_yield: u32,
+    has_yielded: bool,
+    crate_path: &'a TokenStream2,
 }
 
-fn parse_input(input: TokenStream) -> syn::Result<Vec<syn::Stmt>> {
-    let input = replace_for_await(input.into());
-    // syn does not provide a way to parse `Vec<Stmt>` directly from `TokenStream`,
-    // so wrap input in a brace and then parse it as a block.
-    let input = TokenStream2::from(TokenTree::Group(Group::new(Delimiter::Brace, input)));
-    let syn::Block { stmts, .. } = syn::parse2(input)?;
-
-    Ok(stmts)
+fn parse_input(input: TokenStream) -> syn::Result<(TokenStream2, Vec<syn::Stmt>)> {
+    let mut input = TokenStream2::from(input).into_iter();
+    let crate_path = match input.next().unwrap() {
+        TokenTree::Group(group) => group.stream(),
+        _ => panic!(),
+    };
+    let stmts = syn::Block::parse_within.parse2(replace_for_await(input))?;
+    Ok((crate_path, stmts))
 }
 
-impl VisitMut for Scrub {
-    fn visit_expr_mut(&mut self, i: &mut syn::Expr) {
-        if !self.is_xforming {
-            syn::visit_mut::visit_expr_mut(self, i);
-            return;
+impl<'a> Scrub<'a> {
+    fn new(is_try: bool, crate_path: &'a TokenStream2) -> Self {
+        Self {
+            is_try,
+            unit: syn::parse_quote!(()),
+            has_yielded: false,
+            crate_path,
         }
+    }
+}
 
+impl VisitMut for Scrub<'_> {
+    fn visit_expr_mut(&mut self, i: &mut syn::Expr) {
         match i {
             syn::Expr::Yield(yield_expr) => {
-                self.num_yield += 1;
+                self.has_yielded = true;
 
-                let value_expr = if let Some(ref e) = yield_expr.expr {
-                    e
-                } else {
-                    &self.unit
-                };
+                let value_expr = yield_expr.expr.as_ref().unwrap_or(&self.unit);
 
                 // let ident = &self.yielder;
 
                 *i = if self.is_try {
-                    syn::parse_quote! { __yield_tx.send(Ok(#value_expr)).await }
+                    syn::parse_quote! { __yield_tx.send(::core::result::Result::Ok(#value_expr)).await }
                 } else {
                     syn::parse_quote! { __yield_tx.send(#value_expr).await }
                 };
@@ -53,19 +58,16 @@
 
                 *i = syn::parse_quote! {
                     match #e {
-                        Ok(v) => v,
-                        Err(e) => {
-                            __yield_tx.send(Err(e.into())).await;
+                        ::core::result::Result::Ok(v) => v,
+                        ::core::result::Result::Err(e) => {
+                            __yield_tx.send(::core::result::Result::Err(e.into())).await;
                             return;
                         }
                     }
                 };
             }
             syn::Expr::Closure(_) | syn::Expr::Async(_) => {
-                let prev = self.is_xforming;
-                self.is_xforming = false;
-                syn::visit_mut::visit_expr_mut(self, i);
-                self.is_xforming = prev;
+                // Don't transform inner closures or async blocks.
             }
             syn::Expr::ForLoop(expr) => {
                 syn::visit_mut::visit_expr_for_loop_mut(self, expr);
@@ -88,6 +90,7 @@
                     return;
                 }
 
+                let crate_path = self.crate_path;
                 *i = syn::parse_quote! {{
                     let mut __pinned = #expr;
                     let mut __pinned = unsafe {
@@ -95,7 +98,7 @@
                     };
                     #label
                     loop {
-                        let #pat = match ::async_stream::reexport::next(&mut __pinned).await {
+                        let #pat = match #crate_path::reexport::next(&mut __pinned).await {
                             ::core::option::Option::Some(e) => e,
                             ::core::option::Option::None => break,
                         };
@@ -107,70 +110,38 @@
         }
     }
 
-    fn visit_item_mut(&mut self, i: &mut syn::Item) {
-        let prev = self.is_xforming;
-        self.is_xforming = false;
-        syn::visit_mut::visit_item_mut(self, i);
-        self.is_xforming = prev;
+    fn visit_item_mut(&mut self, _: &mut syn::Item) {
+        // Don't transform inner items.
     }
 }
 
-/// Asynchronous stream
-///
-/// See [crate](index.html) documentation for more details.
-///
-/// # Examples
-///
-/// ```rust
-/// use async_stream::stream;
-///
-/// use futures_util::pin_mut;
-/// use futures_util::stream::StreamExt;
-///
-/// #[tokio::main]
-/// async fn main() {
-///     let s = stream! {
-///         for i in 0..3 {
-///             yield i;
-///         }
-///     };
-///
-///     pin_mut!(s); // needed for iteration
-///
-///     while let Some(value) = s.next().await {
-///         println!("got {}", value);
-///     }
-/// }
-/// ```
+/// The first token tree in the stream must be a group containing the path to the `async-stream`
+/// crate.
 #[proc_macro]
-pub fn stream(input: TokenStream) -> TokenStream {
-    let mut stmts = match parse_input(input) {
+#[doc(hidden)]
+pub fn stream_inner(input: TokenStream) -> TokenStream {
+    let (crate_path, mut stmts) = match parse_input(input) {
         Ok(x) => x,
         Err(e) => return e.to_compile_error().into(),
     };
 
-    let mut scrub = Scrub {
-        is_xforming: true,
-        is_try: false,
-        unit: syn::parse_quote!(()),
-        num_yield: 0,
-    };
+    let mut scrub = Scrub::new(false, &crate_path);
 
-    for mut stmt in &mut stmts[..] {
+    for mut stmt in &mut stmts {
         scrub.visit_stmt_mut(&mut stmt);
     }
 
-    let dummy_yield = if scrub.num_yield == 0 {
+    let dummy_yield = if scrub.has_yielded {
+        None
+    } else {
         Some(quote!(if false {
             __yield_tx.send(()).await;
         }))
-    } else {
-        None
     };
 
     quote!({
-        let (mut __yield_tx, __yield_rx) = ::async_stream::yielder::pair();
-        ::async_stream::AsyncStream::new(__yield_rx, async move {
+        let (mut __yield_tx, __yield_rx) = #crate_path::yielder::pair();
+        #crate_path::AsyncStream::new(__yield_rx, async move {
             #dummy_yield
             #(#stmts)*
         })
@@ -178,64 +149,33 @@
     .into()
 }
 
-/// Asynchronous fallible stream
-///
-/// See [crate](index.html) documentation for more details.
-///
-/// # Examples
-///
-/// ```rust
-/// use tokio::net::{TcpListener, TcpStream};
-///
-/// use async_stream::try_stream;
-/// use futures_core::stream::Stream;
-///
-/// use std::io;
-/// use std::net::SocketAddr;
-///
-/// fn bind_and_accept(addr: SocketAddr)
-///     -> impl Stream<Item = io::Result<TcpStream>>
-/// {
-///     try_stream! {
-///         let mut listener = TcpListener::bind(addr).await?;
-///
-///         loop {
-///             let (stream, addr) = listener.accept().await?;
-///             println!("received on {:?}", addr);
-///             yield stream;
-///         }
-///     }
-/// }
-/// ```
+/// The first token tree in the stream must be a group containing the path to the `async-stream`
+/// crate.
 #[proc_macro]
-pub fn try_stream(input: TokenStream) -> TokenStream {
-    let mut stmts = match parse_input(input) {
+#[doc(hidden)]
+pub fn try_stream_inner(input: TokenStream) -> TokenStream {
+    let (crate_path, mut stmts) = match parse_input(input) {
         Ok(x) => x,
         Err(e) => return e.to_compile_error().into(),
     };
 
-    let mut scrub = Scrub {
-        is_xforming: true,
-        is_try: true,
-        unit: syn::parse_quote!(()),
-        num_yield: 0,
-    };
+    let mut scrub = Scrub::new(true, &crate_path);
 
-    for mut stmt in &mut stmts[..] {
+    for mut stmt in &mut stmts {
         scrub.visit_stmt_mut(&mut stmt);
     }
 
-    let dummy_yield = if scrub.num_yield == 0 {
+    let dummy_yield = if scrub.has_yielded {
+        None
+    } else {
         Some(quote!(if false {
             __yield_tx.send(()).await;
         }))
-    } else {
-        None
     };
 
     quote!({
-        let (mut __yield_tx, __yield_rx) = ::async_stream::yielder::pair();
-        ::async_stream::AsyncStream::new(__yield_rx, async move {
+        let (mut __yield_tx, __yield_rx) = #crate_path::yielder::pair();
+        #crate_path::AsyncStream::new(__yield_rx, async move {
             #dummy_yield
             #(#stmts)*
         })
@@ -243,7 +183,8 @@
     .into()
 }
 
-fn replace_for_await(input: TokenStream2) -> TokenStream2 {
+/// Replace `for await` with `#[await] for`, which will be later transformed into a `next` loop.
+fn replace_for_await(input: impl IntoIterator<Item = TokenTree>) -> TokenStream2 {
     let mut input = input.into_iter().peekable();
     let mut tokens = Vec::new();