#![cfg_attr(feature = "hide-proto-docs", doc(hidden))]
use std::{
fmt::{self, Debug},
io,
num::NonZeroU64,
ops::{Bound, RangeBounds},
path::PathBuf,
pin::Pin,
};
use arrayvec::ArrayString;
use cyber_bao::{
io::{mixed::EncodedItem, BaoContentItem, Leaf},
ChunkRanges,
};
use bytes::Bytes;
use irpc::{
channel::{mpsc, oneshot},
rpc_requests,
};
use n0_future::Stream;
use range_collections::RangeSet2;
use serde::{Deserialize, Serialize};
pub(crate) mod bitfield;
pub use bitfield::Bitfield;
use crate::{store::util::Tag, util::temp_tag::TempTag, BlobFormat, Hash, HashAndFormat};
#[allow(dead_code)]
pub(crate) trait HashSpecific {
fn hash(&self) -> Hash;
fn hash_short(&self) -> ArrayString<10> {
self.hash().fmt_short()
}
}
impl HashSpecific for ImportBaoMsg {
fn hash(&self) -> crate::Hash {
self.inner.hash
}
}
impl HashSpecific for ObserveMsg {
fn hash(&self) -> crate::Hash {
self.inner.hash
}
}
impl HashSpecific for ExportBaoMsg {
fn hash(&self) -> crate::Hash {
self.inner.hash
}
}
impl HashSpecific for ExportRangesMsg {
fn hash(&self) -> crate::Hash {
self.inner.hash
}
}
impl HashSpecific for ExportPathMsg {
fn hash(&self) -> crate::Hash {
self.inner.hash
}
}
pub type BoxedByteStream = Pin<Box<dyn Stream<Item = io::Result<Bytes>> + Send + Sync + 'static>>;
impl HashSpecific for CreateTagMsg {
fn hash(&self) -> crate::Hash {
self.inner.value.hash
}
}
#[rpc_requests(message = Command, alias = "Msg", rpc_feature = "rpc")]
#[derive(Debug, Serialize, Deserialize)]
pub enum Request {
#[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
ListBlobs(ListRequest),
#[rpc(tx = oneshot::Sender<Scope>, rx = mpsc::Receiver<BatchResponse>)]
Batch(BatchRequest),
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
DeleteBlobs(BlobDeleteRequest),
#[rpc(rx = mpsc::Receiver<BaoContentItem>, tx = oneshot::Sender<super::Result<()>>)]
ImportBao(ImportBaoRequest),
#[rpc(tx = mpsc::Sender<EncodedItem>)]
ExportBao(ExportBaoRequest),
#[rpc(tx = mpsc::Sender<ExportRangesItem>)]
ExportRanges(ExportRangesRequest),
#[rpc(tx = mpsc::Sender<Bitfield>)]
Observe(ObserveRequest),
#[rpc(tx = oneshot::Sender<BlobStatus>)]
BlobStatus(BlobStatusRequest),
#[rpc(tx = mpsc::Sender<AddProgressItem>)]
ImportBytes(ImportBytesRequest),
#[rpc(rx = mpsc::Receiver<ImportByteStreamUpdate>, tx = mpsc::Sender<AddProgressItem>)]
ImportByteStream(ImportByteStreamRequest),
#[rpc(tx = mpsc::Sender<AddProgressItem>)]
ImportPath(ImportPathRequest),
#[rpc(tx = mpsc::Sender<ExportProgressItem>)]
ExportPath(ExportPathRequest),
#[rpc(tx = oneshot::Sender<Vec<super::Result<TagInfo>>>)]
ListTags(ListTagsRequest),
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
SetTag(SetTagRequest),
#[rpc(tx = oneshot::Sender<super::Result<u64>>)]
DeleteTags(DeleteTagsRequest),
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
RenameTag(RenameTagRequest),
#[rpc(tx = oneshot::Sender<super::Result<Tag>>)]
CreateTag(CreateTagRequest),
#[rpc(tx = oneshot::Sender<Vec<HashAndFormat>>)]
ListTempTags(ListTempTagsRequest),
#[rpc(tx = oneshot::Sender<TempTag>)]
CreateTempTag(CreateTempTagRequest),
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
SyncDb(SyncDbRequest),
#[rpc(tx = oneshot::Sender<()>)]
WaitIdle(WaitIdleRequest),
#[rpc(tx = oneshot::Sender<()>)]
Shutdown(ShutdownRequest),
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
ClearProtected(ClearProtectedRequest),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct WaitIdleRequest;
#[derive(Debug, Serialize, Deserialize)]
pub struct SyncDbRequest;
#[derive(Debug, Serialize, Deserialize)]
pub struct ShutdownRequest;
#[derive(Debug, Serialize, Deserialize)]
pub struct ClearProtectedRequest;
#[derive(Debug, Serialize, Deserialize)]
pub struct BlobStatusRequest {
pub hash: Hash,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ListRequest;
#[derive(Debug, Serialize, Deserialize)]
pub struct BatchRequest;
#[derive(Debug, Serialize, Deserialize)]
pub enum BatchResponse {
Drop(HashAndFormat),
Ping,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct BlobDeleteRequest {
pub hashes: Vec<Hash>,
pub force: bool,
}
#[derive(Serialize, Deserialize)]
pub struct ImportBytesRequest {
pub data: Bytes,
pub format: BlobFormat,
pub scope: Scope,
}
impl fmt::Debug for ImportBytesRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ImportBytes")
.field("data", &self.data.len())
.field("format", &self.format)
.field("scope", &self.scope)
.finish()
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ImportPathRequest {
pub path: PathBuf,
pub mode: ImportMode,
pub format: BlobFormat,
pub scope: Scope,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ImportBaoRequest {
pub hash: Hash,
pub size: NonZeroU64,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ObserveRequest {
pub hash: Hash,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ExportBaoRequest {
pub hash: Hash,
pub ranges: ChunkRanges,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ExportRangesRequest {
pub hash: Hash,
pub ranges: RangeSet2<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ExportPathRequest {
pub hash: Hash,
pub mode: ExportMode,
pub target: PathBuf,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ImportByteStreamRequest {
pub format: BlobFormat,
pub scope: Scope,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ImportByteStreamUpdate {
Bytes(Bytes),
Done,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListTagsRequest {
pub hash_seq: bool,
pub raw: bool,
pub from: Option<Tag>,
pub to: Option<Tag>,
}
impl ListTagsRequest {
pub fn range<R, E>(range: R) -> Self
where
R: RangeBounds<E>,
E: AsRef<[u8]>,
{
let (from, to) = tags_from_range(range);
Self {
from,
to,
raw: true,
hash_seq: true,
}
}
pub fn prefix(prefix: &[u8]) -> Self {
let from = Tag::from(prefix);
let to = from.next_prefix();
Self {
raw: true,
hash_seq: true,
from: Some(from),
to,
}
}
pub fn single(name: &[u8]) -> Self {
let from = Tag::from(name);
Self {
to: Some(from.successor()),
from: Some(from),
raw: true,
hash_seq: true,
}
}
pub fn all() -> Self {
Self {
raw: true,
hash_seq: true,
from: None,
to: None,
}
}
pub fn raw() -> Self {
Self {
raw: true,
hash_seq: false,
from: None,
to: None,
}
}
pub fn hash_seq() -> Self {
Self {
raw: false,
hash_seq: true,
from: None,
to: None,
}
}
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct TagInfo {
pub name: Tag,
pub format: BlobFormat,
pub hash: Hash,
}
impl From<TagInfo> for HashAndFormat {
fn from(tag_info: TagInfo) -> Self {
HashAndFormat {
hash: tag_info.hash,
format: tag_info.format,
}
}
}
impl TagInfo {
pub fn new(name: impl AsRef<[u8]>, value: impl Into<HashAndFormat>) -> Self {
let name = name.as_ref();
let value = value.into();
Self {
name: Tag::from(name),
hash: value.hash,
format: value.format,
}
}
pub fn hash_and_format(&self) -> HashAndFormat {
HashAndFormat {
hash: self.hash,
format: self.format,
}
}
}
pub(crate) fn tags_from_range<R, E>(range: R) -> (Option<Tag>, Option<Tag>)
where
R: RangeBounds<E>,
E: AsRef<[u8]>,
{
let from = match range.start_bound() {
Bound::Included(start) => Some(Tag::from(start.as_ref())),
Bound::Excluded(start) => Some(Tag::from(start.as_ref()).successor()),
Bound::Unbounded => None,
};
let to = match range.end_bound() {
Bound::Included(end) => Some(Tag::from(end.as_ref()).successor()),
Bound::Excluded(end) => Some(Tag::from(end.as_ref())),
Bound::Unbounded => None,
};
(from, to)
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateTempTagRequest {
pub scope: Scope,
pub value: HashAndFormat,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ListTempTagsRequest;
#[derive(Debug, Serialize, Deserialize)]
pub struct RenameTagRequest {
pub from: Tag,
pub to: Tag,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteTagsRequest {
pub from: Option<Tag>,
pub to: Option<Tag>,
}
impl DeleteTagsRequest {
pub fn single(name: &[u8]) -> Self {
let name = Tag::from(name);
Self {
to: Some(name.successor()),
from: Some(name),
}
}
pub fn range<R, E>(range: R) -> Self
where
R: RangeBounds<E>,
E: AsRef<[u8]>,
{
let (from, to) = tags_from_range(range);
Self { from, to }
}
pub fn prefix(prefix: &[u8]) -> Self {
let from = Tag::from(prefix);
let to = from.next_prefix();
Self {
from: Some(from),
to,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SetTagRequest {
pub name: Tag,
pub value: HashAndFormat,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateTagRequest {
pub value: HashAndFormat,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ProcessExitRequest {
pub code: i32,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum AddProgressItem {
CopyProgress(u64),
Size(u64),
CopyDone,
OutboardProgress(u64),
Done(TempTag),
Error(#[serde(with = "crate::util::serde::io_error_serde")] io::Error),
}
impl From<io::Error> for AddProgressItem {
fn from(e: io::Error) -> Self {
Self::Error(e)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ExportRangesItem {
Size(u64),
Data(Leaf),
Error(super::Error),
}
impl From<super::Error> for ExportRangesItem {
fn from(e: super::Error) -> Self {
Self::Error(e)
}
}
impl From<Leaf> for ExportRangesItem {
fn from(leaf: Leaf) -> Self {
Self::Data(leaf)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ExportProgressItem {
Size(u64),
CopyProgress(u64),
Done,
Error(super::Error),
}
impl From<super::Error> for ExportProgressItem {
fn from(e: super::Error) -> Self {
Self::Error(e)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum ImportMode {
#[default]
Copy,
TryReference,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
pub enum ExportMode {
#[default]
Copy,
TryReference,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum BlobStatus {
NotFound,
Partial {
size: Option<u64>,
},
Complete {
size: u64,
},
}
#[derive(
Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq, Hash, derive_more::Display,
)]
pub struct Scope(pub(crate) u64);
impl Scope {
pub const GLOBAL: Self = Self(0);
}
impl std::fmt::Debug for Scope {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.0 == 0 {
write!(f, "Global")
} else {
f.debug_tuple("Scope").field(&self.0).finish()
}
}
}