//! The low level client side API
//!
//! Note that while using this API directly is fine, a simpler way to get data
//! to a store is to use the [`crate::api::remote`] API, in particular the
//! [`crate::api::remote::Remote::fetch`] function to download data to your
//! local store.
//!
//! To get data, create a connection using an [`iroh::Endpoint`].
//!
//! Create a [`crate::protocol::GetRequest`] describing the data you want to get.
//!
//! Then create a state machine using [fsm::start] and
//! drive it to completion by calling next on each state.
//!
//! For some states you have to provide additional arguments when calling next,
//! or you can choose to finish early.
//!
//! [iroh]: https://docs.rs/iroh
use std::fmt::{self, Debug};

use cyber_bao::{io::BaoContentItem, ChunkNum};
use fsm::RequestCounters;
use n0_error::Result;
use n0_future::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
use tracing::{debug, error};

use crate::{
    protocol::ChunkRangesSeq,
    store::IROH_BLOCK_SIZE,
    util::{RecvStream, SendStream},
    Hash,
};

mod error;
pub mod request;
pub use error::{GetError, GetResult};

type DefaultReader = iroh::endpoint::RecvStream;
type DefaultWriter = iroh::endpoint::SendStream;

pub struct StreamPair<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
    pub connection_id: u64,
    pub t0: Instant,
    pub recv: R,
    pub send: W,
}

impl<R: RecvStream, W: SendStream> Debug for StreamPair<R, W> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("StreamPair")
            .field("connection_id", &self.connection_id)
            .field("t0", &self.t0)
            .finish()
    }
}

impl<R: RecvStream, W: SendStream> StreamPair<R, W> {
    pub fn new(connection_id: u64, recv: R, send: W) -> Self {
        Self {
            t0: Instant::now(),
            recv,
            send,
            connection_id,
        }
    }
}

/// Stats about the transfer.
#[derive(
    Debug,
    Default,
    Clone,
    PartialEq,
    Eq,
    Serialize,
    Deserialize,
    derive_more::Deref,
    derive_more::DerefMut,
)]
pub struct Stats {
    /// Counters
    #[deref]
    #[deref_mut]
    pub counters: RequestCounters,
    /// The time it took to transfer the data
    pub elapsed: Duration,
}

impl Stats {
    /// Transfer rate in megabits per second
    pub fn mbits(&self) -> f64 {
        let data_len_bit = self.total_bytes_read() * 8;
        data_len_bit as f64 / (1000. * 1000.) / self.elapsed.as_secs_f64()
    }

    pub fn total_bytes_read(&self) -> u64 {
        self.payload_bytes_read + self.other_bytes_read
    }

    pub fn combine(&mut self, that: &Stats) {
        self.payload_bytes_written += that.payload_bytes_written;
        self.other_bytes_written += that.other_bytes_written;
        self.payload_bytes_read += that.payload_bytes_read;
        self.other_bytes_read += that.other_bytes_read;
        self.elapsed += that.elapsed;
    }
}

/// Finite state machine for get responses.
///
/// This is the low level API for getting data from a peer.
#[doc = include_str!("../docs/img/get_machine.drawio.svg")]
pub mod fsm {
    use std::{io, result};

    use cyber_bao::{
        io::fsm::{OutboardMut, ResponseDecoder, ResponseDecoderNext},
        BaoTree, ChunkRanges, TreeNode,
    };
    use derive_more::From;
    use iroh::endpoint::Connection;
    use iroh_io::AsyncSliceWriter;
    use n0_error::{e, stack_error, AnyError};

    use super::*;
    use crate::{
        protocol::{
            GetManyRequest, GetRequest, NonEmptyRequestRangeSpecIter, Request, MAX_MESSAGE_SIZE,
        },
        util::{RecvStream, RecvStreamAsyncStreamReader, SendStream},
    };

    self_cell::self_cell! {
        struct RangesIterInner {
            owner: ChunkRangesSeq,
            #[not_covariant]
            dependent: NonEmptyRequestRangeSpecIter,
        }
    }

    /// The entry point of the get response machine
    pub fn start(
        connection: Connection,
        request: GetRequest,
        counters: RequestCounters,
    ) -> AtInitial {
        AtInitial::new(connection, request, counters)
    }

    /// Start with a get many request. Todo: turn this into distinct states.
    pub async fn start_get_many(
        connection: Connection,
        request: GetManyRequest,
        counters: RequestCounters,
    ) -> std::result::Result<Result<AtStartChild, AtClosing>, GetError> {
        let start = Instant::now();
        let (mut writer, reader) = connection
            .open_bi()
            .await
            .map_err(|e| e!(InitialNextError::Open, e.into()))?;
        let request = Request::GetMany(request);
        let request_bytes = postcard::to_stdvec(&request)
            .map_err(|source| e!(GetError::BadRequest, AnyError::from_std(source)))?;
        writer
            .send_bytes(request_bytes.into())
            .await
            .map_err(|source| e!(ConnectedNextError::Write, source))?;
        let Request::GetMany(request) = request else {
            unreachable!();
        };
        let mut ranges_iter = RangesIter::new(request.ranges.clone());
        let first_item = ranges_iter.next();
        let misc = Box::new(Misc {
            counters,
            start,
            ranges_iter,
        });
        Ok(match first_item {
            Some((child_offset, child_ranges)) => Ok(AtStartChild {
                ranges: child_ranges,
                reader,
                misc,
                offset: child_offset,
            }),
            None => Err(AtClosing::new(misc, reader, true)),
        })
    }

    /// Owned iterator for the ranges in a request
    ///
    /// We need an owned iterator for a fsm style API, otherwise we would have
    /// to drag a lifetime around every single state.
    struct RangesIter(RangesIterInner);

    impl fmt::Debug for RangesIter {
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
            f.debug_struct("RangesIter").finish()
        }
    }

    impl RangesIter {
        pub fn new(owner: ChunkRangesSeq) -> Self {
            Self(RangesIterInner::new(owner, |owner| {
                owner.iter_non_empty_infinite()
            }))
        }

        pub fn offset(&self) -> u64 {
            self.0.with_dependent(|_owner, iter| iter.offset())
        }
    }

    impl Iterator for RangesIter {
        type Item = (u64, ChunkRanges);

        fn next(&mut self) -> Option<Self::Item> {
            self.0.with_dependent_mut(|_owner, iter| {
                iter.next().map(|(offset, ranges)| (offset, ranges.clone()))
            })
        }
    }

    /// Initial state of the get response machine
    #[derive(Debug)]
    pub struct AtInitial {
        connection: Connection,
        request: GetRequest,
        counters: RequestCounters,
    }

    impl AtInitial {
        /// Create a new get response
        ///
        /// `connection` is an existing connection
        /// `request` is the request to be sent
        pub fn new(connection: Connection, request: GetRequest, counters: RequestCounters) -> Self {
            Self {
                connection,
                request,
                counters,
            }
        }

        /// Initiate a new bidi stream to use for the get response
        pub async fn next(self) -> Result<AtConnected, InitialNextError> {
            let start = Instant::now();
            let (writer, reader) = self
                .connection
                .open_bi()
                .await
                .map_err(|e| e!(InitialNextError::Open, e.into()))?;
            Ok(AtConnected {
                start,
                reader,
                writer,
                request: self.request,
                counters: self.counters,
            })
        }
    }

    /// Error that you can get from [`AtInitial::next`]
    #[stack_error(derive, add_meta, from_sources)]
    pub enum InitialNextError {
        #[error("open: {source}")]
        Open {
            #[error(std_err)]
            source: io::Error,
        },
    }

    /// State of the get response machine after the handshake has been sent
    #[derive(Debug)]
    pub struct AtConnected<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
        start: Instant,
        reader: R,
        writer: W,
        request: GetRequest,
        counters: RequestCounters,
    }

    /// Possible next states after the handshake has been sent
    #[derive(Debug, From)]
    pub enum ConnectedNext<R: RecvStream = DefaultReader> {
        /// First response is either a collection or a single blob
        StartRoot(AtStartRoot<R>),
        /// First response is a child
        StartChild(AtStartChild<R>),
        /// Request is empty
        Closing(AtClosing<R>),
    }

    /// Error that you can get from [`AtConnected::next`]
    #[stack_error(derive, add_meta)]
    pub enum ConnectedNextError {
        /// Error when serializing the request
        #[error("postcard ser: {source}")]
        PostcardSer {
            #[error(std_err)]
            source: postcard::Error,
        },
        /// The serialized request is too long to be sent
        #[error("request too big")]
        RequestTooBig {},
        /// Error when writing the request to the [`SendStream`].
        #[error("write: {source}")]
        Write {
            #[error(std_err)]
            source: io::Error,
        },
    }

    impl<R: RecvStream, W: SendStream> AtConnected<R, W> {
        pub fn new(
            start: Instant,
            reader: R,
            writer: W,
            request: GetRequest,
            counters: RequestCounters,
        ) -> Self {
            Self {
                start,
                reader,
                writer,
                request,
                counters,
            }
        }

        /// Send the request and move to the next state
        ///
        /// The next state will be either `StartRoot` or `StartChild` depending on whether
        /// the request requests part of the collection or not.
        ///
        /// If the request is empty, this can also move directly to `Finished`.
        pub async fn next(self) -> Result<ConnectedNext<R>, ConnectedNextError> {
            let Self {
                start,
                reader,
                mut writer,
                mut request,
                mut counters,
            } = self;
            // 1. Send Request
            counters.other_bytes_written += {
                debug!("sending request");
                let wrapped = Request::Get(request);
                let request_bytes = postcard::to_stdvec(&wrapped)
                    .map_err(|source| e!(ConnectedNextError::PostcardSer, source))?;
                let Request::Get(x) = wrapped else {
                    unreachable!();
                };
                request = x;

                if request_bytes.len() > MAX_MESSAGE_SIZE {
                    return Err(e!(ConnectedNextError::RequestTooBig));
                }

                // write the request itself
                let len = request_bytes.len() as u64;
                writer
                    .send_bytes(request_bytes.into())
                    .await
                    .map_err(|source| e!(ConnectedNextError::Write, source))?;
                writer
                    .sync()
                    .await
                    .map_err(|source| e!(ConnectedNextError::Write, source))?;
                len
            };

            // 2. Finish writing before expecting a response
            drop(writer);

            let hash = request.hash;
            let ranges_iter = RangesIter::new(request.ranges);
            // this is in a box so we don't have to memcpy it on every state transition
            let mut misc = Box::new(Misc {
                counters,
                start,
                ranges_iter,
            });
            Ok(match misc.ranges_iter.next() {
                Some((offset, ranges)) => {
                    if offset == 0 {
                        AtStartRoot {
                            reader,
                            ranges,
                            misc,
                            hash,
                        }
                        .into()
                    } else {
                        AtStartChild {
                            reader,
                            ranges,
                            misc,
                            offset,
                        }
                        .into()
                    }
                }
                None => AtClosing::new(misc, reader, true).into(),
            })
        }
    }

    /// State of the get response when we start reading a collection
    #[derive(Debug)]
    pub struct AtStartRoot<R: RecvStream = DefaultReader> {
        ranges: ChunkRanges,
        reader: R,
        misc: Box<Misc>,
        hash: Hash,
    }

    /// State of the get response when we start reading a child
    #[derive(Debug)]
    pub struct AtStartChild<R: RecvStream = DefaultReader> {
        ranges: ChunkRanges,
        reader: R,
        misc: Box<Misc>,
        offset: u64,
    }

    impl<R: RecvStream> AtStartChild<R> {
        /// The offset of the child we are currently reading
        ///
        /// This must be used to determine the hash needed to call next.
        /// If this is larger than the number of children in the collection,
        /// you can call finish to stop reading the response.
        pub fn offset(&self) -> u64 {
            self.offset
        }

        /// The ranges we have requested for the child
        pub fn ranges(&self) -> &ChunkRanges {
            &self.ranges
        }

        /// Go into the next state, reading the header
        ///
        /// This requires passing in the hash of the child for validation
        pub fn next(self, hash: Hash) -> AtBlobHeader<R> {
            AtBlobHeader {
                reader: self.reader,
                ranges: self.ranges,
                misc: self.misc,
                hash,
            }
        }

        /// Finish the get response without reading further
        ///
        /// This is used if you know that there are no more children from having
        /// read the collection, or when you want to stop reading the response
        /// early.
        pub fn finish(self) -> AtClosing<R> {
            AtClosing::new(self.misc, self.reader, false)
        }
    }

    impl<R: RecvStream> AtStartRoot<R> {
        /// The ranges we have requested for the child
        pub fn ranges(&self) -> &ChunkRanges {
            &self.ranges
        }

        /// Hash of the root blob
        pub fn hash(&self) -> Hash {
            self.hash
        }

        /// Go into the next state, reading the header
        ///
        /// For the collection we already know the hash, since it was part of the request
        pub fn next(self) -> AtBlobHeader<R> {
            AtBlobHeader {
                reader: self.reader,
                ranges: self.ranges,
                hash: self.hash,
                misc: self.misc,
            }
        }

        /// Finish the get response without reading further
        pub fn finish(self) -> AtClosing<R> {
            AtClosing::new(self.misc, self.reader, false)
        }
    }

    /// State before reading a size header
    #[derive(Debug)]
    pub struct AtBlobHeader<R: RecvStream = DefaultReader> {
        ranges: ChunkRanges,
        reader: R,
        misc: Box<Misc>,
        hash: Hash,
    }

    /// Error that you can get from [`AtBlobHeader::next`]
    #[stack_error(derive, add_meta)]
    pub enum AtBlobHeaderNextError {
        /// Eof when reading the size header
        ///
        /// This indicates that the provider does not have the requested data.
        #[error("not found")]
        NotFound {},
        /// Generic io error
        #[error("io: {source}")]
        Read {
            #[error(std_err)]
            source: io::Error,
        },
    }

    impl From<AtBlobHeaderNextError> for io::Error {
        fn from(cause: AtBlobHeaderNextError) -> Self {
            match cause {
                AtBlobHeaderNextError::NotFound { .. } => {
                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
                }
                AtBlobHeaderNextError::Read { source, .. } => source,
            }
        }
    }

    impl<R: RecvStream> AtBlobHeader<R> {
        /// Read the size header, returning it and going into the `Content` state.
        pub async fn next(mut self) -> Result<(AtBlobContent<R>, u64), AtBlobHeaderNextError> {
            let mut size = [0; 8];
            self.reader.recv_exact(&mut size).await.map_err(|cause| {
                if cause.kind() == io::ErrorKind::UnexpectedEof {
                    e!(AtBlobHeaderNextError::NotFound)
                } else {
                    e!(AtBlobHeaderNextError::Read, cause)
                }
            })?;
            self.misc.other_bytes_read += 8;
            let size = u64::from_le_bytes(size);
            let stream = ResponseDecoder::new(
                self.hash.into(),
                self.ranges,
                BaoTree::new(size, IROH_BLOCK_SIZE),
                RecvStreamAsyncStreamReader::new(self.reader),
            );
            Ok((
                AtBlobContent {
                    stream,
                    misc: self.misc,
                },
                size,
            ))
        }

        /// Drain the response and throw away the result
        pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
            let (content, _size) = self.next().await?;
            content.drain().await
        }

        /// Concatenate the entire response into a vec
        ///
        /// For a request that does not request the complete blob, this will just
        /// concatenate the ranges that were requested.
        pub async fn concatenate_into_vec(
            self,
        ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
            let (content, _size) = self.next().await?;
            content.concatenate_into_vec().await
        }

        /// Write the entire blob to a slice writer.
        pub async fn write_all<D: AsyncSliceWriter>(
            self,
            data: D,
        ) -> result::Result<AtEndBlob<R>, DecodeError> {
            let (content, _size) = self.next().await?;
            let res = content.write_all(data).await?;
            Ok(res)
        }

        /// Write the entire blob to a slice writer and to an optional outboard.
        ///
        /// The outboard is only written to if the blob is larger than a single
        /// chunk group.
        pub async fn write_all_with_outboard<D, O>(
            self,
            outboard: Option<O>,
            data: D,
        ) -> result::Result<AtEndBlob<R>, DecodeError>
        where
            D: AsyncSliceWriter,
            O: OutboardMut,
        {
            let (content, _size) = self.next().await?;
            let res = content.write_all_with_outboard(outboard, data).await?;
            Ok(res)
        }

        /// The hash of the blob we are reading.
        pub fn hash(&self) -> Hash {
            self.hash
        }

        /// The ranges we have requested for the current hash.
        pub fn ranges(&self) -> &ChunkRanges {
            &self.ranges
        }

        /// The current offset of the blob we are reading.
        pub fn offset(&self) -> u64 {
            self.misc.ranges_iter.offset()
        }
    }

    /// State while we are reading content
    #[derive(Debug)]
    pub struct AtBlobContent<R: RecvStream = DefaultReader> {
        stream: ResponseDecoder<RecvStreamAsyncStreamReader<R>>,
        misc: Box<Misc>,
    }

    /// Decode error that you can get once you have sent the request and are
    /// decoding the response, e.g. from [`AtBlobContent::next`].
    ///
    /// This is similar to [`cyber_bao::io::DecodeError`], but takes into account
    /// that we are reading from a [`RecvStream`], so read errors will be
    /// propagated as [`DecodeError::Read`], containing a [`io::Error`].
    ///
    /// When the provider finds that it does not have a chunk that we requested,
    /// or that the chunk is invalid, it will stop sending data without producing
    /// an error. This is indicated by either the [`DecodeError::ParentNotFound`] or
    /// [`DecodeError::LeafNotFound`] variant, which can be used to detect that data
    /// is missing but the connection as well that the provider is otherwise healthy.
    ///
    /// The [`DecodeError::ParentHashMismatch`] and [`DecodeError::LeafHashMismatch`]
    /// variants indicate that the provider has sent us invalid data. A well-behaved
    /// provider should never do this, so this is an indication that the provider is
    /// not behaving correctly.
    #[non_exhaustive]
    #[stack_error(derive, add_meta)]
    pub enum DecodeError {
        /// A chunk was not found or invalid, so the provider stopped sending data
        #[error("not found")]
        ChunkNotFound {},
        /// A parent was not found or invalid, so the provider stopped sending data
        #[error("parent not found {node:?}")]
        ParentNotFound { node: TreeNode },
        /// A parent was not found or invalid, so the provider stopped sending data
        #[error("chunk not found {num}")]
        LeafNotFound { num: ChunkNum },
        /// The hash of a parent did not match the expected hash
        #[error("parent hash mismatch: {node:?}")]
        ParentHashMismatch { node: TreeNode },
        /// The hash of a leaf did not match the expected hash
        #[error("leaf hash mismatch: {num}")]
        LeafHashMismatch { num: ChunkNum },
        /// Error when reading from the stream
        #[error("read: {source}")]
        Read {
            #[error(std_err)]
            source: io::Error,
        },
        /// A generic io error
        #[error("io: {source}")]
        Write {
            #[error(std_err)]
            source: io::Error,
        },
    }

    impl DecodeError {
        pub(crate) fn leaf_hash_mismatch(num: ChunkNum) -> Self {
            e!(DecodeError::LeafHashMismatch { num })
        }
    }

    impl From<AtBlobHeaderNextError> for DecodeError {
        fn from(cause: AtBlobHeaderNextError) -> Self {
            match cause {
                AtBlobHeaderNextError::NotFound { .. } => e!(DecodeError::ChunkNotFound),
                AtBlobHeaderNextError::Read { source, .. } => e!(DecodeError::Read, source),
            }
        }
    }

    impl From<DecodeError> for io::Error {
        fn from(cause: DecodeError) -> Self {
            match cause {
                DecodeError::ParentNotFound { .. } => {
                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
                }
                DecodeError::LeafNotFound { .. } => {
                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
                }
                DecodeError::Read { source, .. } => source,
                DecodeError::Write { source, .. } => source,
                _ => io::Error::other(cause),
            }
        }
    }

    impl From<cyber_bao::io::DecodeError> for DecodeError {
        fn from(value: cyber_bao::io::DecodeError) -> Self {
            match value {
                cyber_bao::io::DecodeError::ParentNotFound(node) => {
                    e!(DecodeError::ParentNotFound { node })
                }
                cyber_bao::io::DecodeError::LeafNotFound(num) => {
                    e!(DecodeError::LeafNotFound { num })
                }
                cyber_bao::io::DecodeError::ParentHashMismatch(node) => {
                    e!(DecodeError::ParentHashMismatch { node })
                }
                cyber_bao::io::DecodeError::LeafHashMismatch(num) => {
                    e!(DecodeError::LeafHashMismatch { num })
                }
                cyber_bao::io::DecodeError::Io(cause) => e!(DecodeError::Read, cause),
            }
        }
    }

    /// The next state after reading a content item
    #[derive(Debug, From)]
    pub enum BlobContentNext<R: RecvStream> {
        /// We expect more content
        More(
            (
                AtBlobContent<R>,
                result::Result<BaoContentItem, DecodeError>,
            ),
        ),
        /// We are done with this blob
        Done(AtEndBlob<R>),
    }

    impl<R: RecvStream> AtBlobContent<R> {
        /// Read the next item, either content, an error, or the end of the blob
        pub async fn next(self) -> BlobContentNext<R> {
            match self.stream.next().await {
                ResponseDecoderNext::More((stream, res)) => {
                    let mut next = Self { stream, ..self };
                    let res = res.map_err(DecodeError::from);
                    match &res {
                        Ok(BaoContentItem::Parent(_)) => {
                            next.misc.other_bytes_read += 64;
                        }
                        Ok(BaoContentItem::Leaf(leaf)) => {
                            next.misc.payload_bytes_read += leaf.data.len() as u64;
                        }
                        _ => {}
                    }
                    BlobContentNext::More((next, res))
                }
                ResponseDecoderNext::Done(stream) => BlobContentNext::Done(AtEndBlob {
                    stream: stream.into_inner(),
                    misc: self.misc,
                }),
            }
        }

        /// The geometry of the tree we are currently reading.
        pub fn tree(&self) -> cyber_bao::BaoTree {
            self.stream.tree()
        }

        /// The hash of the blob we are reading.
        pub fn hash(&self) -> Hash {
            (*self.stream.hash()).into()
        }

        /// The current offset of the blob we are reading.
        pub fn offset(&self) -> u64 {
            self.misc.ranges_iter.offset()
        }

        /// Current stats
        pub fn stats(&self) -> Stats {
            Stats {
                counters: self.misc.counters,
                elapsed: self.misc.start.elapsed(),
            }
        }

        /// Drain the response and throw away the result
        pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
            let mut content = self;
            loop {
                match content.next().await {
                    BlobContentNext::More((content1, res)) => {
                        let _ = res?;
                        content = content1;
                    }
                    BlobContentNext::Done(end) => {
                        break Ok(end);
                    }
                }
            }
        }

        /// Concatenate the entire response into a vec
        pub async fn concatenate_into_vec(
            self,
        ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
            let mut res = Vec::with_capacity(1024);
            let mut curr = self;
            let done = loop {
                match curr.next().await {
                    BlobContentNext::More((next, data)) => {
                        if let BaoContentItem::Leaf(leaf) = data? {
                            res.extend_from_slice(&leaf.data);
                        }
                        curr = next;
                    }
                    BlobContentNext::Done(done) => {
                        // we are done with the root blob
                        break done;
                    }
                }
            };
            Ok((done, res))
        }

        /// Write the entire blob to a slice writer and to an optional outboard.
        ///
        /// The outboard is only written to if the blob is larger than a single
        /// chunk group.
        pub async fn write_all_with_outboard<D, O>(
            self,
            mut outboard: Option<O>,
            mut data: D,
        ) -> result::Result<AtEndBlob<R>, DecodeError>
        where
            D: AsyncSliceWriter,
            O: OutboardMut,
        {
            let mut content = self;
            loop {
                match content.next().await {
                    BlobContentNext::More((content1, item)) => {
                        content = content1;
                        match item? {
                            BaoContentItem::Parent(parent) => {
                                if let Some(outboard) = outboard.as_mut() {
                                    outboard
                                        .save(parent.node, &parent.pair)
                                        .await
                                        .map_err(|e| e!(DecodeError::Write, e))?;
                                }
                            }
                            BaoContentItem::Leaf(leaf) => {
                                data.write_bytes_at(leaf.offset, leaf.data)
                                    .await
                                    .map_err(|e| e!(DecodeError::Write, e))?;
                            }
                        }
                    }
                    BlobContentNext::Done(end) => {
                        return Ok(end);
                    }
                }
            }
        }

        /// Write the entire blob to a slice writer.
        pub async fn write_all<D>(self, mut data: D) -> result::Result<AtEndBlob<R>, DecodeError>
        where
            D: AsyncSliceWriter,
        {
            let mut content = self;
            loop {
                match content.next().await {
                    BlobContentNext::More((content1, item)) => {
                        content = content1;
                        match item? {
                            BaoContentItem::Parent(_) => {}
                            BaoContentItem::Leaf(leaf) => {
                                data.write_bytes_at(leaf.offset, leaf.data)
                                    .await
                                    .map_err(|e| e!(DecodeError::Write, e))?;
                            }
                        }
                    }
                    BlobContentNext::Done(end) => {
                        return Ok(end);
                    }
                }
            }
        }

        /// Immediately finish the get response without reading further
        pub fn finish(self) -> AtClosing<R> {
            AtClosing::new(self.misc, self.stream.finish().into_inner(), false)
        }
    }

    /// State after we have read all the content for a blob
    #[derive(Debug)]
    pub struct AtEndBlob<R: RecvStream = DefaultReader> {
        stream: R,
        misc: Box<Misc>,
    }

    /// The next state after the end of a blob
    #[derive(Debug, From)]
    pub enum EndBlobNext<R: RecvStream = DefaultReader> {
        /// Response is expected to have more children
        MoreChildren(AtStartChild<R>),
        /// No more children expected
        Closing(AtClosing<R>),
    }

    impl<R: RecvStream> AtEndBlob<R> {
        /// Read the next child, or finish
        pub fn next(mut self) -> EndBlobNext<R> {
            if let Some((offset, ranges)) = self.misc.ranges_iter.next() {
                AtStartChild {
                    reader: self.stream,
                    offset,
                    ranges,
                    misc: self.misc,
                }
                .into()
            } else {
                AtClosing::new(self.misc, self.stream, true).into()
            }
        }
    }

    /// State when finishing the get response
    #[derive(Debug)]
    pub struct AtClosing<R: RecvStream = DefaultReader> {
        misc: Box<Misc>,
        reader: R,
        check_extra_data: bool,
    }

    impl<R: RecvStream> AtClosing<R> {
        fn new(misc: Box<Misc>, reader: R, check_extra_data: bool) -> Self {
            Self {
                misc,
                reader,
                check_extra_data,
            }
        }

        /// Finish the get response, returning statistics
        pub async fn next(self) -> result::Result<Stats, AtClosingNextError> {
            // Shut down the stream
            let mut reader = self.reader;
            if self.check_extra_data {
                let rest = reader.recv_bytes(1).await?;
                if !rest.is_empty() {
                    error!("Unexpected extra data at the end of the stream");
                }
            }
            Ok(Stats {
                counters: self.misc.counters,
                elapsed: self.misc.start.elapsed(),
            })
        }
    }

    /// Error that you can get from [`AtBlobHeader::next`]
    #[stack_error(derive, add_meta, from_sources)]
    pub enum AtClosingNextError {
        /// Generic io error
        #[error(transparent)]
        Read {
            #[error(std_err)]
            source: io::Error,
        },
    }

    #[derive(Debug, Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq)]
    pub struct RequestCounters {
        /// payload bytes written
        pub payload_bytes_written: u64,
        /// request, hash pair and size bytes written
        pub other_bytes_written: u64,
        /// payload bytes read
        pub payload_bytes_read: u64,
        /// hash pair and size bytes read
        pub other_bytes_read: u64,
    }

    /// Stuff we need to hold on to while going through the machine states
    #[derive(Debug, derive_more::Deref, derive_more::DerefMut)]
    struct Misc {
        /// start time for statistics
        start: Instant,
        /// counters
        #[deref]
        #[deref_mut]
        counters: RequestCounters,
        /// iterator over the ranges of the collection and the children
        ranges_iter: RangesIter,
    }
}

Neighbours