re #5 - Allow conversion between transient and persistent mode

This commit is contained in:
Jamie Cockburn 2021-07-06 14:53:29 +01:00
parent 1242eef6ec
commit 3e4ed8cec3
6 changed files with 142 additions and 36 deletions

View file

@ -59,6 +59,8 @@ have wait until all 2 million items have been parsed first.
`json_stream.load()` has two modes of operation, controlled by
the `persistent` argument (default false).
It is also possible to "mix" the modes as you consume the data.
#### Transient mode (default)
This mode is appropriate if you can consume the data iteratively. You cannot
@ -143,6 +145,58 @@ data.read_all()
Persistent mode is not appropriate if you care about memory consumption, but
provides an identical experience compared to `json.load()`.
#### Mixed mode
In some cases you will need to be able to randomly access some part of the
data, but still only have that specific data taking up memory resources.
For example, you might have a very long list of objects, but you cannot always
access the keys of the objects in stream order. You want to be able to iterate
the list transiently, but access the result objects persistently.
This can be achieved using the `persistent()` method of all the `list` or
`dict`-like objects json_stream produces. Calling `persistent()` causes the existing
transient object to produce persistent child objects.
Note that the `persistent()` method makes the children of the object it
is called on persistent, not the object it is called on.
```python
import json_stream
# JSON: {"results": [{"x": 1, "y": 3}, {"y": 4, "x": 2}]}
# note that the keys of the inner objects are not ordered
data = json_stream.load(f) # data is a transient dict-like object
# iterate transient list, but produce persistent items
for result in data['results'].persistent():
# result is a persistent dict-like object
print(result['x']) # print x
print(result['y']) # print y (error on second result without .persistent())
print(result['x']) # print x again (error without .persistent())
```
The opposite is also possible, going from persistent mode to transient mode, though
the use cases for this are more esoteric.
```python
# JSON: {"a": 1, "x": ["long", "list", "I", "don't", "want", "in", "memory"], "b": 2}
data = load(StringIO(json), persistent=True).transient()
# data is a persistent dict-list object that produces transient children
print(data["a"]) # prints 1
x = data["x"] # x is a transient list, you can use it accordingly
print(x[0]) # prints long
# access earlier data from memory
print(data["a"]) # this would have raised an exception if data was transient
print(data["b"]) # prints 2
# we have now moved past all the data in the transient list
print(x[0]) # will raise exception
```
### visitor pattern
You can also parse using a visitor-style approach where a function you supply

View file

@ -1,6 +1,6 @@
[metadata]
name = json-stream
version = 1.0.2
version = 1.1.0
author = Jamie Cockburn
author_email = jamie_cockburn@hotmail.co.uk
description = Streaming JSON decoder

View file

@ -1,4 +1,5 @@
import collections
import copy
from abc import ABC
from collections import OrderedDict
from itertools import chain
@ -13,8 +14,20 @@ class TransientAccessException(Exception):
class StreamingJSONBase(ABC):
@classmethod
def factory(cls, token, token_stream):
raise NotImplementedError() # pragma: no cover
def factory(cls, token, token_stream, persistent):
if persistent:
if token == '{':
return PersistentStreamingJSONObject(token_stream)
if token == '[':
return PersistentStreamingJSONList(token_stream)
else:
if token == '{':
return TransientStreamingJSONObject(token_stream)
if token == '[':
return TransientStreamingJSONList(token_stream)
raise ValueError(f"Unknown operator {token}") # pragma: no cover
_persistent_children: bool
def __init__(self, token_stream):
self.streaming = True
@ -59,23 +72,26 @@ class StreamingJSONBase(ABC):
def __iter__(self) -> Iterator[str]:
raise NotImplementedError() # pragma: no cover
def __copy__(self):
raise copy.Error("Copying json_steam objects leads to a bad time")
def __deepcopy__(self, memo):
raise copy.Error("Copying json_steam objects leads to a bad time")
class PersistentStreamingJSONBase(StreamingJSONBase, ABC):
@classmethod
def factory(cls, token, token_stream, _=None):
if token == '{':
return PersistentStreamingJSONObject(token_stream)
if token == '[':
return PersistentStreamingJSONList(token_stream)
raise ValueError(f"Unknown operator {token}") # pragma: no cover
def __init__(self, token_stream):
super().__init__(token_stream)
self._data = self._init_persistent_data()
self._persistent_children = True
def _init_persistent_data(self):
raise NotImplementedError() # pragma: no cover
def transient(self):
self._persistent_children = False
return self
def __iter__(self):
return chain(self._data, self._get__iter__())
@ -88,17 +104,10 @@ class PersistentStreamingJSONBase(StreamingJSONBase, ABC):
class TransientStreamingJSONBase(StreamingJSONBase, ABC):
@classmethod
def factory(cls, token, token_stream, _=None):
if token == '{':
return TransientStreamingJSONObject(token_stream)
if token == '[':
return TransientStreamingJSONList(token_stream)
raise ValueError(f"Unknown operator {token}") # pragma: no cover
def __init__(self, token_stream):
super().__init__(token_stream)
self._started = False
self._persistent_children = False
def _iter_items(self):
self._started = True
@ -108,9 +117,17 @@ class TransientStreamingJSONBase(StreamingJSONBase, ABC):
return self._find_item(k)
def __iter__(self):
self._check_started()
return self._get__iter__()
def persistent(self):
self._check_started()
self._persistent_children = True
return self
def _check_started(self):
if self._started:
raise TransientAccessException("Cannot restart iteration of transient JSON stream")
return self._get__iter__()
def __repr__(self): # pragma: no cover
return f"<{type(self).__name__}: TRANSIENT, {'STREAMING' if self.streaming else 'DONE'}>"
@ -129,7 +146,7 @@ class StreamingJSONList(StreamingJSONBase, ABC):
else: # pragma: no cover
raise ValueError(f"Expecting value, comma or ], got {v}")
if token_type == TokenType.OPERATOR:
self._child = v = self.factory(v, self._stream)
self._child = v = self.factory(v, self._stream, self._persistent_children)
return v
def _get__iter__(self):
@ -197,7 +214,7 @@ class StreamingJSONObject(StreamingJSONBase, ABC):
token_type, v = next(self._stream)
if token_type == TokenType.OPERATOR:
self._child = v = self.factory(v, self._stream)
self._child = v = self.factory(v, self._stream, self._persistent_children)
return k, v
def _get__iter__(self):
@ -258,16 +275,13 @@ class TransientStreamingJSONObject(TransientStreamingJSONBase, StreamingJSONObje
raise
def items(self):
if self._started:
raise TransientAccessException("Cannot restart iteration of transient JSON stream")
self._check_started()
return self._iter_items()
def keys(self):
if self._started:
raise TransientAccessException("Cannot restart iteration of transient JSON stream")
self._check_started()
return (k for k, v in self._iter_items())
def values(self):
if self._started:
raise TransientAccessException("Cannot restart iteration of transient JSON stream")
self._check_started()
return (v for k, v in self._iter_items())

View file

@ -1,12 +1,8 @@
from json_stream.base import StreamingJSONBase, PersistentStreamingJSONBase, TransientStreamingJSONBase
from json_stream.base import StreamingJSONBase
from json_stream.tokenizer import tokenize
def load(fp, persistent=False):
token_stream = tokenize(fp)
_, token = next(token_stream)
if persistent:
return PersistentStreamingJSONBase.factory(token, token_stream)
else:
return TransientStreamingJSONBase.factory(token, token_stream)
return StreamingJSONBase.factory(token, token_stream, persistent)

View file

@ -1,3 +1,4 @@
import copy
import json
from io import StringIO
from itertools import zip_longest
@ -170,6 +171,47 @@ class TestLoader(TestCase):
self.assertListEqual(["a"], list(item))
self.assertEqual(list(items), ['b', 'c'])
def test_not_copiable(self):
json = '[["a"], "b", "c"]'
with self.assertRaisesRegex(copy.Error, "^Copying json_steam objects leads to a bad time$"):
copy.copy(load(StringIO(json)))
with self.assertRaisesRegex(copy.Error, "^Copying json_steam objects leads to a bad time$"):
copy.deepcopy(load(StringIO(json)))
def test_transient_to_persistent(self):
json = '{"results": [{"x": 1, "y": 3}, {"y": 4, "x": 2}]}'
xs = iter((1, 2))
ys = iter((3, 4))
data = load(StringIO(json)) # data is a transient dict-like object
self.assertIsInstance(data, TransientStreamingJSONObject)
results = data['results']
self.assertIsInstance(results, TransientStreamingJSONList)
# iterate transient list, but produce persistent items
for result in results.persistent():
# result is a persistent dict-like object
self.assertIsInstance(result, PersistentStreamingJSONObject)
x = next(xs)
y = next(ys)
self.assertEqual(result['x'], x)
self.assertEqual(result['y'], y) # would error on second result without .persistent()
self.assertEqual(result['x'], x) # would error without .persistent()
def test_persistent_to_transient(self):
json = """{"a": 1, "x": ["long", "list", "I", "don't", "want", "in", "memory"], "b": 2}"""
data = load(StringIO(json), persistent=True).transient()
self.assertIsInstance(data, PersistentStreamingJSONObject)
self.assertEqual(data["a"], 1)
l = data["x"]
self.assertIsInstance(l, TransientStreamingJSONList)
self.assertEqual(data["b"], 2)
self.assertEqual(data["b"], 2) # would error if data was transient
with self.assertRaisesRegex(TransientAccessException, "Index 0 already passed in this stream"):
_ = l[0] # cannot access transient list
def _test_object(self, obj, persistent):
self.assertListEqual(list(self._to_data(obj, persistent)), list(obj))
self.assertListEqual(list(self._to_data(obj, persistent).keys()), list(obj.keys()))

View file

@ -1,4 +1,4 @@
from json_stream.base import TransientStreamingJSONBase, StreamingJSONObject, StreamingJSONList
from json_stream.base import StreamingJSONObject, StreamingJSONList, StreamingJSONBase
from json_stream.tokenizer import tokenize
@ -21,5 +21,5 @@ def _visit(obj, visitor, path):
def visit(fp, visitor):
token_stream = tokenize(fp)
_, token = next(token_stream)
obj = TransientStreamingJSONBase.factory(token, token_stream)
obj = StreamingJSONBase.factory(token, token_stream, persistent=False)
_visit(obj, visitor, ())