import pytest
import tempfile
import random
import os
import time
import iroh
import asyncio
from iroh import Hash, Iroh, SetTagOption, BlobFormat, WrapOption, AddProgressType, NodeOptions
def test_hash():
hash_str = "2kbxxbofqx5rau77wzafrj4yntjb4gn4olfpwxmv26js6dvhgjhq"
hex_str = "d2837b85c585fb1053ffb64058a7986cd21e19bc72cafb5d95d7932f0ea7324f"
bytes = b'\xd2\x83\x7b\x85\xc5\x85\xfb\x10\x53\xff\xb6\x40\x58\xa7\x98\x6c\xd2\x1e\x19\xbc\x72\xca\xfb\x5d\x95\xd7\x93\x2f\x0e\xa7\x32\x4f'
hash = Hash.from_string(hash_str)
assert str(hash) == hash_str
assert hash.to_bytes() == bytes
assert hash.to_hex() == hex_str
hash_0 = Hash.from_bytes(bytes)
assert str(hash_0) == hash_str
assert hash_0.to_bytes() == bytes
assert hash_0.to_hex() == hex_str
assert hash.equal(hash_0)
assert hash_0.equal(hash)
@pytest.mark.asyncio
async def test_blob_add_get_bytes():
iroh.iroh_ffi.uniffi_set_event_loop(asyncio.get_running_loop())
dir = tempfile.TemporaryDirectory()
node = await Iroh.persistent(dir.name)
blob_size = 100
bytes = bytearray(map(random.getrandbits,(8,)*blob_size))
add_outcome = await node.blobs().add_bytes(bytes)
assert add_outcome.format == BlobFormat.RAW
assert add_outcome.size == blob_size
hash = add_outcome.hash
got_size = await node.blobs().size(hash)
assert got_size == blob_size
got_bytes = await node.blobs().read_to_bytes(hash)
assert len(got_bytes) == blob_size
assert got_bytes == bytes
blob_exists = await node.blobs().has(hash)
assert blob_exists
@pytest.mark.asyncio
async def test_blob_read_write_path():
iroh.iroh_ffi.uniffi_set_event_loop(asyncio.get_running_loop())
iroh_dir = tempfile.TemporaryDirectory()
node = await Iroh.persistent(iroh_dir.name)
blob_size = 100
bytes = bytearray(map(random.getrandbits,(8,)*blob_size))
dir = tempfile.TemporaryDirectory()
path = os.path.join(dir.name, "in")
file = open(path, "wb")
file.write(bytes)
file.close()
tag = SetTagOption.auto()
wrap = WrapOption.no_wrap()
class AddCallback:
hash = None
format = None
async def progress(x, progress_event):
print(progress_event.type())
if progress_event.type() == AddProgressType.ALL_DONE:
all_done_event = progress_event.as_all_done()
x.hash = all_done_event.hash
print(all_done_event.hash)
print(all_done_event.format)
x.format = all_done_event.format
if progress_event.type() == AddProgressType.ABORT:
abort_event = progress_event.as_abort()
raise Exception(abort_event.error)
cb = AddCallback()
await node.blobs().add_from_path(path, False, tag, wrap, cb)
assert cb.format == BlobFormat.RAW
assert cb.hash is not None
got_size = await node.blobs().size(cb.hash)
assert got_size == blob_size
got_bytes = await node.blobs().read_to_bytes(cb.hash)
print("read_to_bytes {}", got_bytes)
assert len(got_bytes) == blob_size
assert got_bytes == bytes
out_path = os.path.join(dir.name, "out")
await node.blobs().write_to_path(cb.hash, out_path)
got_file = open(out_path, "rb")
got_bytes = got_file.read()
got_file.close()
print("write_to_path {}", got_bytes)
assert len(got_bytes) == blob_size
assert got_bytes == bytes
@pytest.mark.asyncio
async def test_blob_collections():
iroh.iroh_ffi.uniffi_set_event_loop(asyncio.get_running_loop())
collection_dir = tempfile.TemporaryDirectory()
num_files = 3
blob_size = 100
for i in range(num_files):
path = os.path.join(collection_dir.name, str(i))
bytes = bytearray(map(random.getrandbits,(8,)*blob_size))
file = open(path, "wb")
file.write(bytes)
file.close()
print(collection_dir.__sizeof__())
iroh_dir = tempfile.TemporaryDirectory()
node = await Iroh.persistent(iroh_dir.name)
blobs = await node.blobs().list()
assert len(blobs) == 0
class AddCallback:
collection_hash = None
format = None
blob_hashes = []
async def progress(self, progress_event):
print(progress_event.type())
if progress_event.type() == AddProgressType.ALL_DONE:
all_done_event = progress_event.as_all_done()
self.collection_hash = all_done_event.hash
self.format = all_done_event.format
if progress_event.type() == AddProgressType.ABORT:
abort_event = progress_event.as_abort()
raise Exception(abort_event.error)
if progress_event.type() == AddProgressType.DONE:
done_event = progress_event.as_done()
print(done_event.hash)
self.blob_hashes.append(done_event.hash)
cb = AddCallback()
tag = SetTagOption.auto()
wrap = WrapOption.no_wrap()
await node.blobs().add_from_path(collection_dir.name, False, tag, wrap, cb)
assert cb.collection_hash is not None
assert cb.format == BlobFormat.HASH_SEQ
collections = await node.blobs().list_collections()
print("collection hash ", collections[0].hash)
assert len(collections) == 1
assert collections[0].hash.equal(cb.collection_hash)
assert collections[0].total_blobs_count == 4
collection_hashes = cb.blob_hashes
collection_hashes.append(cb.collection_hash)
got_hashes = await node.blobs().list()
for hash in got_hashes:
blob = await node.blobs().read_to_bytes(hash)
print("hash ", hash, " has size ", len(blob))
hashes_exist(collection_hashes, got_hashes)
assert len(collection_hashes)+1 == len(got_hashes)
@pytest.mark.asyncio
async def test_list_and_delete():
iroh.iroh_ffi.uniffi_set_event_loop(asyncio.get_running_loop())
iroh_dir = tempfile.TemporaryDirectory()
opts = NodeOptions(gc_interval_millis=100)
node = await Iroh.persistent_with_options(iroh_dir.name, opts)
blob_size = 100
blobs = []
num_blobs = 3
for x in range(num_blobs):
print(x)
bytes = bytearray(map(random.getrandbits, (8,)*blob_size))
blobs.append(bytes)
hashes = []
tags = []
for blob in blobs:
output = await node.blobs().add_bytes(blob)
hashes.append(output.hash)
tags.append(output.tag)
got_hashes = await node.blobs().list()
assert len(got_hashes) == num_blobs
hashes_exist(hashes, got_hashes)
remove_hash = hashes.pop(0)
remove_tag = tags.pop(0)
await node.tags().delete(remove_tag)
time.sleep(0.5)
got_hashes = await node.blobs().list()
assert len(got_hashes) == num_blobs - 1
hashes_exist(hashes, got_hashes)
for hash in got_hashes:
if remove_hash.equal(hash):
raise Exception("blob {} should have been removed", remove_hash)
def hashes_exist(expect, got):
for hash in expect:
exists = False
for h in got:
if h.equal(hash):
exists = True
if not exists:
raise Exception("could not find ", hash, "in list")