initial commit
This commit is contained in:
4
exercises/08_futures/00_intro/Cargo.toml
Normal file
4
exercises/08_futures/00_intro/Cargo.toml
Normal file
@@ -0,0 +1,4 @@
|
||||
[package]
|
||||
name = "intro_08"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
14
exercises/08_futures/00_intro/src/lib.rs
Normal file
14
exercises/08_futures/00_intro/src/lib.rs
Normal file
@@ -0,0 +1,14 @@
|
||||
fn intro() -> &'static str {
|
||||
// TODO: fix me 👇
|
||||
"I'm ready to _!"
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::intro;
|
||||
|
||||
#[test]
|
||||
fn test_intro() {
|
||||
assert_eq!(intro(), "I'm ready to learn about futures!");
|
||||
}
|
||||
}
|
||||
8
exercises/08_futures/01_async_fn/Cargo.toml
Normal file
8
exercises/08_futures/01_async_fn/Cargo.toml
Normal file
@@ -0,0 +1,8 @@
|
||||
[package]
|
||||
name = "async_fn"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.83"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
45
exercises/08_futures/01_async_fn/src/lib.rs
Normal file
45
exercises/08_futures/01_async_fn/src/lib.rs
Normal file
@@ -0,0 +1,45 @@
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
// TODO: write an echo server that accepts incoming TCP connections and
|
||||
// echoes the received data back to the client.
|
||||
// `echo` should not return when it finishes processing a connection, but should
|
||||
// continue to accept new connections.
|
||||
//
|
||||
// Hint: you should rely on `tokio`'s structs and methods to implement the echo server.
|
||||
// In particular:
|
||||
// - `tokio::net::TcpListener::accept` to process the next incoming connection
|
||||
// - `tokio::net::TcpStream::split` to obtain a reader and a writer from the socket
|
||||
// - `tokio::io::copy` to copy data from the reader to the writer
|
||||
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_echo() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(echo(listener));
|
||||
|
||||
let requests = vec!["hello", "world", "foo", "bar"];
|
||||
|
||||
for request in requests {
|
||||
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
|
||||
let (mut reader, mut writer) = socket.split();
|
||||
|
||||
// Send the request
|
||||
writer.write_all(request.as_bytes()).await.unwrap();
|
||||
// Close the write side of the socket
|
||||
writer.shutdown().await.unwrap();
|
||||
|
||||
// Read the response
|
||||
let mut buf = Vec::with_capacity(request.len());
|
||||
reader.read_to_end(&mut buf).await.unwrap();
|
||||
assert_eq!(&buf, request.as_bytes());
|
||||
}
|
||||
}
|
||||
}
|
||||
8
exercises/08_futures/02_spawn/Cargo.toml
Normal file
8
exercises/08_futures/02_spawn/Cargo.toml
Normal file
@@ -0,0 +1,8 @@
|
||||
[package]
|
||||
name = "spawn"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.83"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
60
exercises/08_futures/02_spawn/src/lib.rs
Normal file
60
exercises/08_futures/02_spawn/src/lib.rs
Normal file
@@ -0,0 +1,60 @@
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
// TODO: write an echo server that accepts TCP connections on two listeners, concurrently.
|
||||
// Multiple connections (on the same listeners) should be processed concurrently.
|
||||
// The received data should be echoed back to the client.
|
||||
pub async fn echoes(first: TcpListener, second: TcpListener) -> Result<(), anyhow::Error> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::net::SocketAddr;
|
||||
use std::panic;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
async fn bind_random() -> (TcpListener, SocketAddr) {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
(listener, addr)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_echo() {
|
||||
let (first_listener, first_addr) = bind_random().await;
|
||||
let (second_listener, second_addr) = bind_random().await;
|
||||
tokio::spawn(echoes(first_listener, second_listener));
|
||||
|
||||
let requests = vec!["hello", "world", "foo", "bar"];
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
for request in requests.clone() {
|
||||
for addr in [first_addr, second_addr] {
|
||||
join_set.spawn(async move {
|
||||
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
|
||||
let (mut reader, mut writer) = socket.split();
|
||||
|
||||
// Send the request
|
||||
writer.write_all(request.as_bytes()).await.unwrap();
|
||||
// Close the write side of the socket
|
||||
writer.shutdown().await.unwrap();
|
||||
|
||||
// Read the response
|
||||
let mut buf = Vec::with_capacity(request.len());
|
||||
reader.read_to_end(&mut buf).await.unwrap();
|
||||
assert_eq!(&buf, request.as_bytes());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
while let Some(outcome) = join_set.join_next().await {
|
||||
if let Err(e) = outcome {
|
||||
if let Ok(reason) = e.try_into_panic() {
|
||||
panic::resume_unwind(reason);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
8
exercises/08_futures/03_runtime/Cargo.toml
Normal file
8
exercises/08_futures/03_runtime/Cargo.toml
Normal file
@@ -0,0 +1,8 @@
|
||||
[package]
|
||||
name = "runtime"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.83"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
61
exercises/08_futures/03_runtime/src/lib.rs
Normal file
61
exercises/08_futures/03_runtime/src/lib.rs
Normal file
@@ -0,0 +1,61 @@
|
||||
// TODO: Implement the `fixed_reply` function. It should accept two `TcpListener` instances,
|
||||
// accept connections on both of them concurrently, and always reply to clients by sending
|
||||
// the `Display` representation of the `reply` argument as a response.
|
||||
use std::fmt::Display;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
pub async fn fixed_reply<T>(first: TcpListener, second: TcpListener, reply: T)
|
||||
where
|
||||
// `T` cannot be cloned. How do you share it between the two server tasks?
|
||||
T: Display + Send + Sync + 'static,
|
||||
{
|
||||
todo!()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::net::SocketAddr;
|
||||
use std::panic;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
async fn bind_random() -> (TcpListener, SocketAddr) {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
(listener, addr)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_echo() {
|
||||
let (first_listener, first_addr) = bind_random().await;
|
||||
let (second_listener, second_addr) = bind_random().await;
|
||||
let reply = "Yo";
|
||||
tokio::spawn(fixed_reply(first_listener, second_listener, reply));
|
||||
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
for _ in 0..3 {
|
||||
for addr in [first_addr, second_addr] {
|
||||
join_set.spawn(async move {
|
||||
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
|
||||
let (mut reader, _) = socket.split();
|
||||
|
||||
// Read the response
|
||||
let mut buf = Vec::new();
|
||||
reader.read_to_end(&mut buf).await.unwrap();
|
||||
assert_eq!(&buf, reply.as_bytes());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
while let Some(outcome) = join_set.join_next().await {
|
||||
if let Err(e) = outcome {
|
||||
if let Ok(reason) = e.try_into_panic() {
|
||||
panic::resume_unwind(reason);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
7
exercises/08_futures/04_future/Cargo.toml
Normal file
7
exercises/08_futures/04_future/Cargo.toml
Normal file
@@ -0,0 +1,7 @@
|
||||
[package]
|
||||
name = "future"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
16
exercises/08_futures/04_future/src/lib.rs
Normal file
16
exercises/08_futures/04_future/src/lib.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
//! TODO: get the code to compile by **re-ordering** the statements
|
||||
//! in the `example` function. You're not allowed to change the
|
||||
//! `spawner` function nor what each line does in `example`.
|
||||
//! You can wrap existing statements in blocks `{}` if needed.
|
||||
use std::rc::Rc;
|
||||
use tokio::task::yield_now;
|
||||
|
||||
fn spawner() {
|
||||
tokio::spawn(example());
|
||||
}
|
||||
|
||||
async fn example() {
|
||||
let non_send = Rc::new(1);
|
||||
yield_now().await;
|
||||
println!("{}", non_send);
|
||||
}
|
||||
8
exercises/08_futures/05_blocking/Cargo.toml
Normal file
8
exercises/08_futures/05_blocking/Cargo.toml
Normal file
@@ -0,0 +1,8 @@
|
||||
[package]
|
||||
name = "blocking"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.83"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
71
exercises/08_futures/05_blocking/src/lib.rs
Normal file
71
exercises/08_futures/05_blocking/src/lib.rs
Normal file
@@ -0,0 +1,71 @@
|
||||
// TODO: the `echo` server uses non-async primitives.
|
||||
// When running the tests, you should observe that it hangs, due to a
|
||||
// deadlock between the caller and the server.
|
||||
// Use `spawn_blocking` inside `echo` to resolve the issue.
|
||||
use std::io::{Read, Write};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
|
||||
loop {
|
||||
let (socket, _) = listener.accept().await?;
|
||||
let mut socket = socket.into_std()?;
|
||||
socket.set_nonblocking(false)?;
|
||||
let mut buffer = Vec::new();
|
||||
socket.read_to_end(&mut buffer)?;
|
||||
socket.write_all(&buffer)?;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::net::SocketAddr;
|
||||
use std::panic;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
async fn bind_random() -> (TcpListener, SocketAddr) {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
(listener, addr)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_echo() {
|
||||
let (listener, addr) = bind_random().await;
|
||||
tokio::spawn(echo(listener));
|
||||
|
||||
let requests = vec![
|
||||
"hello here we go with a long message",
|
||||
"world",
|
||||
"foo",
|
||||
"bar",
|
||||
];
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
for request in requests {
|
||||
join_set.spawn(async move {
|
||||
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
|
||||
let (mut reader, mut writer) = socket.split();
|
||||
|
||||
// Send the request
|
||||
writer.write_all(request.as_bytes()).await.unwrap();
|
||||
// Close the write side of the socket
|
||||
writer.shutdown().await.unwrap();
|
||||
|
||||
// Read the response
|
||||
let mut buf = Vec::with_capacity(request.len());
|
||||
reader.read_to_end(&mut buf).await.unwrap();
|
||||
assert_eq!(&buf, request.as_bytes());
|
||||
});
|
||||
}
|
||||
|
||||
while let Some(outcome) = join_set.join_next().await {
|
||||
if let Err(e) = outcome {
|
||||
if let Ok(reason) = e.try_into_panic() {
|
||||
panic::resume_unwind(reason);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
[package]
|
||||
name = "async_locks"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
53
exercises/08_futures/06_async_aware_primitives/src/lib.rs
Normal file
53
exercises/08_futures/06_async_aware_primitives/src/lib.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
/// TODO: the code below will deadlock because it's using std's channels,
|
||||
/// which are not async-aware.
|
||||
/// Rewrite it to use `tokio`'s channels primitive (you'll have to touch
|
||||
/// the testing code too, yes).
|
||||
///
|
||||
/// Can you understand the sequence of events that can lead to a deadlock?
|
||||
use std::sync::mpsc;
|
||||
|
||||
pub struct Message {
|
||||
payload: String,
|
||||
response_channel: mpsc::Sender<Message>,
|
||||
}
|
||||
|
||||
/// Replies with `pong` to any message it receives, setting up a new
|
||||
/// channel to continue communicating with the caller.
|
||||
pub async fn pong(mut receiver: mpsc::Receiver<Message>) {
|
||||
loop {
|
||||
if let Ok(msg) = receiver.recv() {
|
||||
println!("Pong received: {}", msg.payload);
|
||||
let (sender, new_receiver) = mpsc::channel();
|
||||
msg.response_channel
|
||||
.send(Message {
|
||||
payload: "pong".into(),
|
||||
response_channel: sender,
|
||||
})
|
||||
.unwrap();
|
||||
receiver = new_receiver;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{pong, Message};
|
||||
use std::sync::mpsc;
|
||||
|
||||
#[tokio::test]
|
||||
async fn ping() {
|
||||
let (sender, receiver) = mpsc::channel();
|
||||
let (response_sender, response_receiver) = mpsc::channel();
|
||||
sender
|
||||
.send(Message {
|
||||
payload: "pong".into(),
|
||||
response_channel: response_sender,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
tokio::spawn(pong(receiver));
|
||||
|
||||
let answer = response_receiver.recv().unwrap().payload;
|
||||
assert_eq!(answer, "pong");
|
||||
}
|
||||
}
|
||||
7
exercises/08_futures/07_cancellation/Cargo.toml
Normal file
7
exercises/08_futures/07_cancellation/Cargo.toml
Normal file
@@ -0,0 +1,7 @@
|
||||
[package]
|
||||
name = "cancellation"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
51
exercises/08_futures/07_cancellation/src/lib.rs
Normal file
51
exercises/08_futures/07_cancellation/src/lib.rs
Normal file
@@ -0,0 +1,51 @@
|
||||
// TODO: fix the `assert_eq` at the end of the tests.
|
||||
// Do you understand why that's the resulting output?
|
||||
use std::time::Duration;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
pub async fn run(listener: TcpListener, n_messages: usize, timeout: Duration) -> Vec<u8> {
|
||||
let mut buffer = Vec::new();
|
||||
for _ in 0..n_messages {
|
||||
let (mut stream, _) = listener.accept().await.unwrap();
|
||||
let _ = tokio::time::timeout(timeout, async {
|
||||
stream.read_to_end(&mut buffer).await.unwrap();
|
||||
})
|
||||
.await;
|
||||
}
|
||||
buffer
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
#[tokio::test]
|
||||
async fn ping() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let messages = vec!["hello", "from", "this", "task"];
|
||||
let timeout = Duration::from_millis(20);
|
||||
let handle = tokio::spawn(run(listener, messages.len(), timeout.clone()));
|
||||
|
||||
for message in messages {
|
||||
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
|
||||
let (_, mut writer) = socket.split();
|
||||
|
||||
let (beginning, end) = message.split_at(message.len() / 2);
|
||||
|
||||
// Send first half
|
||||
writer.write_all(beginning.as_bytes()).await.unwrap();
|
||||
tokio::time::sleep(timeout * 2).await;
|
||||
writer.write_all(end.as_bytes()).await.unwrap();
|
||||
|
||||
// Close the write side of the socket
|
||||
let _ = writer.shutdown().await;
|
||||
}
|
||||
|
||||
let buffered = handle.await.unwrap();
|
||||
let buffered = std::str::from_utf8(&buffered).unwrap();
|
||||
assert_eq!(buffered, "");
|
||||
}
|
||||
}
|
||||
7
exercises/08_futures/08_outro/Cargo.toml
Normal file
7
exercises/08_futures/08_outro/Cargo.toml
Normal file
@@ -0,0 +1,7 @@
|
||||
[package]
|
||||
name = "outro_08"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
10
exercises/08_futures/08_outro/src/lib.rs
Normal file
10
exercises/08_futures/08_outro/src/lib.rs
Normal file
@@ -0,0 +1,10 @@
|
||||
// This is our last exercise. Let's go down a more unstructured path!
|
||||
// Try writing an **asynchronous REST API** to expose the functionality
|
||||
// of the ticket management system we built throughout the course.
|
||||
// It should expose endpoints to:
|
||||
// - Create a ticket
|
||||
// - Retrieve ticket details
|
||||
// - Patch a ticket
|
||||
//
|
||||
// Use Rust's package registry, crates.io, to find the dependencies you need
|
||||
// (if any) to build this system.
|
||||
Reference in New Issue
Block a user