File size: 4,573 Bytes
932db78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0fd4a4d
932db78
0fd4a4d
932db78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0fd4a4d
932db78
 
 
 
 
9a3c8f5
932db78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9a3c8f5
 
932db78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import pickle
import asyncio
from pathlib import Path
from pydantic import BaseModel

from typing import Any
from PIL import Image
ObjectName = ObjectId = BucketName = str

from .utils import image_to_object, object_to_image, write_compressed, read_compressed
from .wrapper import ImgurClient as Imgur

BUCKETS_FOLDER = Path(__file__).parent / 'buckets'
EXTENSION = '.bkt'

def filename(name: str):
    return name + EXTENSION


class UploadedObject(BaseModel):
    """Represents an object that has been uploaded"""
    name: ObjectName = None
    obj_id: ObjectId
    deletehash: str = None
    cached_obj: Any = None

    def __getstate__(self):
        d = super().__getstate__()
        d['__dict__']['cached_obj'] = None
        d['__fields_set__'].discard('cached_obj')
        return d

    async def download(self) -> Any:
        """Return the wrapped object."""
        if self.cached_obj is not None:
            return self.cached_obj

        async with Imgur() as imgur:
            img = await imgur.download_image(self.obj_id)

        obj = self.cached_obj = image_to_object(img)
        return obj
    
    async def delete(self) -> None:
        """Delete the uploaded object"""
        async with Imgur() as imgur:
            await imgur.delete_image(self.deletehash)


class StagedObject(BaseModel):
    """Represents a bucket object that has been added to a bucket but not yet
    uploaded"""
    name: ObjectName
    obj: Any

    def image(self) -> Image.Image:
        return object_to_image(self.obj)
    
    async def upload(self) -> UploadedObject:
        """Upload the wrapped object and return an UploadedObject.
        
        The wrapped object is cached to the UploadObject so that a retrieval
        during the same runtime does not need to download the object.
        """
        async with Imgur() as imgur:
            oid, delete = await imgur.upload_image(self.image())
        
        return UploadedObject(
            name=self.name,
            obj_id=oid,
            deletehash=delete,
            cached_obj=self.obj,
        )


class Bucket:
    def __init__(self, name) -> None:
        self.name = name
        self.uploaded: dict[ObjectName, UploadedObject] = {}
        self.pending: dict[ObjectName, StagedObject] = {}
    
    def __repr__(self) -> str:
        n_pending = len(self.pending)
        n_uploaded = len(self.uploaded)
        return f'<Bucket {self.name} (pending: {n_pending}, uploaded: {n_uploaded})>'
    
    def _save(self):
        """Pickle and dump the bucket to the buckets folder"""
        fn = filename(self.name)
        write_compressed(pickle.dumps(self), BUCKETS_FOLDER / fn)

    def stage_obj(self, obj: Any, name: str) -> None:
        self.pending[name] = StagedObject(obj=obj, name=name)

        self._save()
    
    def unstage_obj(self, name: str) -> None:
        del self.pending[name]
        self._save()
    
    async def commit(self):
        """Upload all staged objects"""
        coros = [o.upload() for o in self.pending.values()]

        # upload concurrently and filter out errors
        results = await asyncio.gather(*coros, return_exceptions=True)
        uploaded = [e for e in results if isinstance(e, UploadedObject)]
        
        # remove succesful uploads from pending
        for o in uploaded:
            del self.pending[o.name]

        # track uploaded objects
        self.uploaded.update({o.name: o for o in uploaded})

        self._save()

        # warn about unuploaded pending objects
        if self.pending:
            msg = (
                f"{len(self.pending)} objects failed to upload:\n" +
                '\n'.join(o.name for o in self.pending.values())
            )
            raise Warning(msg)

        return uploaded
    
    async def get_obj(self, name: str) -> Any:
        return await self.uploaded[name].download()

    async def delete_obj(self, name: str) -> None:
        """Remove the object with the given name from uploaded objects"""
        if not (o := self.uploaded.pop(name, 0)):
            raise ValueError(f'No obj with name {name} found in {self}')

        await o.delete()
        self._save()
    
    async def delete(self):
        """Delete the bucket and all objects it holds"""
        coros = [o.delete() for o in self.uploaded.values()]
        await asyncio.gather(*coros)
        (BUCKETS_FOLDER / filename(self.name)).unlink()

    @classmethod
    def load(cls, name: str) -> "Bucket":
        fn = filename(name)
        return pickle.loads(read_compressed(BUCKETS_FOLDER / fn))