Initial commit

This commit is contained in:
Jamie Cockburn 2020-07-28 20:54:10 +01:00
commit 79d1d0f4fb
9 changed files with 787 additions and 0 deletions

19
LICENSE.txt Normal file
View file

@ -0,0 +1,19 @@
Copyright (c) 2020 Jamie Cockburn
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

172
README.md Normal file
View file

@ -0,0 +1,172 @@
# json-stream
Simple streaming JSON parser.
`json-stream` is a JSON parser just like the standard library's
[`json.load()`](https://docs.python.org/3/library/json.html#json.load). It
will read a JSON document and convert it into native python types.
Features:
* stream all JSON data types (objects or lists)
* stream nested data
* simple pythonic `list`-like/`dict`-like interface
Unlike `json.load()`, `json-stream` can _stream_ JSON data from a file-like
object. This has the following benefits:
* It does not require the whole json document to be into memory up-front
* It can start producing data before the entire document has finished loading
* It only requires enough memory to hold the data currently being parsed
## What are the problems with standard `json.load()`?
The problem with the `json.load()` stem from the fact that it must read
the whole JSON document into memory before parsing it.
### Memory usage
`json.load()` first reads the whole document into memory as a string. It
then starts parsing that string and converting the whole document into python types
again stored in memory. For a very large document, this could be more memory
than you have available to your system.
`json-stream` does not read the whole document into memory, it only buffers
enough from the stream to produce the next item of data.
In transient mode (see below) `json-stream` also doesn't store up all of
the parsed data is memory.
### Latency
`json.load()` produces all the data after parsing the whole document. If you
only care about the first 10 items in a list of 2 million items, then you
have wait until all 2 million items have been parsed first.
`json-stream` produces data as soon as it is available in the stream.
## Usage
`json_stream.load()` has two modes of operation, controlled by
the `persistent` argument (default false).
### Transient mode (default)
This mode is appropriate if you can consume the data iteratively. It is also
the mode you must use if you do not want to use the all memory required to store
the entire parsed result.
In transient mode, only the data currently being read is stored in memory. Any
data previously read from the stream is discarded (it's up to you what to do
with it) and attempting to access this data results in a `TransientAccessException`.
```python
import json_stream
# JSON: {"x": 1, "y": ["a", "b", "c"]}
data = json_stream.load(f) # {"x": 1, "y": ['a', 'b', 'c']}
# use data like a list or dict
y = data["y"]
# already read past "x" in stream -> exception
x = data["x"]
# iterate
for c in y:
print(c) # prints a, b, c
# already read from list -> exception
for c in y: pass
```
### Persistent mode
In persistent mode all previously read data is stored in memory as
it is parsed. The returned `dict`-like or `list`-like objects
can be used just like normal data structures.
If you request an index or key that has already been read from the stream
then it is retrieved from memory. If you request an index or key that has
not yet been read from the stream, then the request blocks until that item
is found in the stream.
```python
import json_stream
# JSON: {"x": 1, "y": ["a", "b", "c"]}
data = json_stream.load(f, persistent=True)
# use data like a list or dict
# stream is read up to the middle of list
b = data["y"][1] # b = "b"
# read from memory
x = data["x"] # x = 1
```
Persistent mode is not appropriate if you care about memory consumption, but
provides an identical experience compared to `json.load()`.
## visitor pattern
You can also parse using a visitor-style approach where a function you supply
is called for each data item as it is parsed (depth-first).
This uses a transient parser under the hood, so does not consume memory for
the whole document.
```python
import json_stream
# JSON: {"x": 1, "y": {}, "xxxx": [1,2, {"yyyy": 1}, "z", 1, []]}
def visitor(path, data):
print(f"{path}: {data}")
json_stream.visit(f, visitor)
```
Output:
```
('x',): 1
('y',): {}
('xxxx', 0): 1
('xxxx', 1): 2
('xxxx', 2, 'yyyy'): 1
('xxxx', 3): z
('xxxx', 4): 1
('xxxx', 5): []
```
# Future improvements
* Allow long strings in the JSON to be read as streams themselves
* Allow transient mode on seekable streams to seek to data earlier in
the stream instead of raising a `TransientAccessException`
* A more efficient tokenizer?
# Alternatives
## NAYA
[NAYA](https://github.com/danielyule/naya) is a pure python JSON parser for
parsing a simple JSON list as a stream.
### Why not NAYA?
* It can only stream JSON containing a top-level list
* It does not provide a pythonic `dict`/`list`-like interface
## Yajl-Py
[Yajl-Py]() is a wrapper around the Yajl JSON library that can be used to
generate SAX style events while parsing JSON.
### Why not Yajl-Py?
* It's not pure python
* It does not provide a pythonic `dict`/`list`-like interface
# Acknowledgements
The JSON tokenizer used in the project was taken from the [NAYA](https://github.com/danielyule/naya) project.

50
setup.py Normal file
View file

@ -0,0 +1,50 @@
from setuptools import setup, find_packages
import pathlib
here = pathlib.Path(__file__).parent.resolve()
# Get the long description from the README file
long_description = (here / 'README.md').read_text(encoding='utf-8')
setup(
name='json-stream',
version='1.0.0',
description='Streaming JSON decoder',
long_description=long_description,
long_description_content_type='text/markdown',
url='https://github.com/daggaz/json-stream',
author='Jamie Cockburn',
author_email='jamie_cockburn@hotmail.co.uk', # Optional
classifiers=[ # Optional
# How mature is this project? Common values are
# 3 - Alpha
# 4 - Beta
# 5 - Production/Stable
'Development Status :: 3 - Alpha',
# Indicate who your project is intended for
'Intended Audience :: Developers',
'Topic :: Software Development :: Libraries',
# Pick your license as you wish
'License :: OSI Approved :: MIT License',
# Specify the Python versions you support here. In particular, ensure
# that you indicate you support Python 3. These classifiers are *not*
# checked by 'pip install'. See instead 'python_requires' below.
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3 :: Only',
],
keywords='sample, setuptools, development',
package_dir={'': 'src'},
packages=find_packages(where='src'),
python_requires='>=3.5, <4',
project_urls={
'Bug Reports': 'https://github.com/daggaz/json-stream/issues',
'Source': 'https://github.com/daggaz/json-stream/',
},
)

View file

@ -0,0 +1,2 @@
from json_stream.loader import load
from json_stream.visitor import visit

173
src/json_stream/base.py Normal file
View file

@ -0,0 +1,173 @@
import collections
from collections import OrderedDict
from itertools import chain
from typing import Sized, Optional, Iterator, Sequence, Union, Any, Mapping
from naya.json import TOKEN_TYPE
class TransientAccessException(Exception):
pass
class StreamingJSONStates:
STREAMING = 'STREAMING'
DONE = 'DONE'
class StreamingJSONBase(Sized):
@classmethod
def factory(cls, token, token_stream, persistent):
if token == '{':
return StreamingJSONObject(token_stream, persistent)
if token == '[':
return StreamingJSONList(token_stream, persistent)
raise ValueError(f"Unknown operator {token}")
def __init__(self, token_stream, persistent):
self._state = StreamingJSONStates.STREAMING
self._stream = token_stream
self._child: Optional[StreamingJSONBase] = None
self._data = self._init_persistent_data() if persistent else None
self._i = -1
@property
def persistent(self):
return self._data is not None
def _clear_child(self):
if self._child is not None:
self._child.read_all()
self._child = None
def _iter_items(self):
while True:
yield self._next()
def _next(self):
if not self.is_streaming():
raise StopIteration()
self._clear_child()
item = self._load_item()
self._i += 1
return item
def _done(self):
self._state = StreamingJSONStates.DONE
raise StopIteration()
def read_all(self):
collections.deque(self._iter_items(), maxlen=0)
def _iter(self):
return self._iter_items()
def _init_persistent_data(self):
raise NotImplementedError()
def _load_item(self):
raise NotImplementedError()
def is_streaming(self):
return self._state == StreamingJSONStates.DONE
def __iter__(self) -> Iterator[str]:
if self.persistent:
return chain(self._data, self._iter())
if self._i != -1:
raise TransientAccessException("Cannot restart iteration of transient JSON stream")
return self._iter()
def __len__(self) -> int:
self.read_all()
return self._i + 1
def __repr__(self):
return f"<{type(self).__name__}: {repr(self._data)}, {self._state}>"
class StreamingJSONList(StreamingJSONBase, Sequence):
def __init__(self, token_stream, persistent):
super().__init__(token_stream, persistent)
def _init_persistent_data(self):
return []
def _load_item(self):
token_type, v = next(self._stream)
if token_type == TOKEN_TYPE.OPERATOR:
if v == ']':
self._done()
if v == ',':
token_type, v = next(self._stream)
else:
raise ValueError(f"Expecting value, comma or ], got {v}")
if token_type == TOKEN_TYPE.OPERATOR:
self._child = v = StreamingJSONBase.factory(v, self._stream, self.persistent)
if self._data is not None:
self._data.append(v)
return v
def _find_item(self, i):
if self._i >= i:
raise TransientAccessException(f"Index {i} already passed in this stream")
for v in iter(self._iter_items()):
if self._i == i:
return v
raise IndexError(f"Index {i} out of range")
def __getitem__(self, i: Union[int, slice]) -> Any:
if self.persistent:
try:
return self._data[i]
except IndexError:
pass
return self._find_item(i)
class StreamingJSONObject(StreamingJSONBase, Mapping):
def _init_persistent_data(self):
return OrderedDict()
def _iter(self):
return (k for k, v in self._iter_items())
def items(self):
return self._iter_items()
def _load_item(self):
token_type, k = next(self._stream)
if token_type == TOKEN_TYPE.OPERATOR:
if k == '}':
self._done()
if k == ',':
token_type, k = next(self._stream)
if token_type != TOKEN_TYPE.STRING:
raise ValueError(f"Expecting string, comma or }}, got {k} ({token_type})")
token_type, token = next(self._stream)
if token_type != TOKEN_TYPE.OPERATOR or token != ":":
raise ValueError("Expecting :")
token_type, v = next(self._stream)
if token_type == TOKEN_TYPE.OPERATOR:
self._child = v = StreamingJSONBase.factory(v, self._stream, self.persistent)
if self._data is not None:
self._data[k] = v
return k, v
def _find_item(self, k):
for next_k, v in iter(self._iter_items()):
if next_k == k:
return v
if self.persistent:
raise KeyError(k)
raise TransientAccessException(f"{k} not found in transient JSON stream or already passed in this stream")
def __getitem__(self, k) -> Any:
if self.persistent:
try:
return self._data[k]
except KeyError:
pass
return self._find_item(k)

View file

@ -0,0 +1,8 @@
from json_stream.base import StreamingJSONBase
from json_stream.tokenizer import tokenize
def load(fp, persistent=False):
token_stream = tokenize(fp)
_, token = next(token_stream)
return StreamingJSONBase.factory(token, token_stream, persistent)

View file

View file

@ -0,0 +1,337 @@
"""
Taken from the NAYA project
https://github.com/danielyule/naya
Copyright (c) 2019 Daniel Yule
"""
class TokenType:
OPERATOR = 0
STRING = 1
NUMBER = 2
BOOLEAN = 3
NULL = 4
class State:
WHITESPACE = 0
INTEGER_0 = 1
INTEGER_SIGN = 2
INTEGER = 3
INTEGER_EXP = 4
INTEGER_EXP_0 = 5
FLOATING_POINT_0 = 6
FLOATING_POINT = 8
STRING = 9
STRING_ESCAPE = 10
STRING_END = 11
TRUE_1 = 12
TRUE_2 = 13
TRUE_3 = 14
FALSE_1 = 15
FALSE_2 = 16
FALSE_3 = 17
FALSE_4 = 18
NULL_1 = 19
NULL_2 = 20
NULL_3 = 21
UNICODE_1 = 22
UNICODE_2 = 23
UNICODE_3 = 24
UNICODE_4 = 25
def tokenize(stream):
def is_delimiter(char):
return char.isspace() or char in "{}[]:,"
token = []
charcode = 0
completed = False
now_token = ""
def process_char(char, charcode):
nonlocal token, completed, now_token
advance = True
add_char = False
next_state = state
if state == State.WHITESPACE:
if char == "{":
completed = True
now_token = (TokenType.OPERATOR, "{")
elif char == "}":
completed = True
now_token = (TokenType.OPERATOR, "}")
elif char == "[":
completed = True
now_token = (TokenType.OPERATOR, "[")
elif char == "]":
completed = True
now_token = (TokenType.OPERATOR, "]")
elif char == ",":
completed = True
now_token = (TokenType.OPERATOR, ",")
elif char == ":":
completed = True
now_token = (TokenType.OPERATOR, ":")
elif char == "\"":
next_state = State.STRING
elif char in "123456789":
next_state = State.INTEGER
add_char = True
elif char == "0":
next_state = State.INTEGER_0
add_char = True
elif char == "-":
next_state = State.INTEGER_SIGN
add_char = True
elif char == "f":
next_state = State.FALSE_1
elif char == "t":
next_state = State.TRUE_1
elif char == "n":
next_state = State.NULL_1
elif not char.isspace():
raise ValueError("Invalid JSON character: '{0}'".format(char))
elif state == State.INTEGER:
if char in "0123456789":
add_char = True
elif char == ".":
next_state = State.FLOATING_POINT_0
add_char = True
elif char == "e" or char == 'E':
next_state = State.INTEGER_EXP_0
add_char = True
elif is_delimiter(char):
next_state = State.WHITESPACE
completed = True
now_token = (TokenType.NUMBER, int("".join(token)))
advance = False
else:
raise ValueError("A number must contain only digits. Got '{}'".format(char))
elif state == State.INTEGER_0:
if char == ".":
next_state = State.FLOATING_POINT_0
add_char = True
elif char == "e" or char == 'E':
next_state = State.INTEGER_EXP_0
add_char = True
elif is_delimiter(char):
next_state = State.WHITESPACE
completed = True
now_token = (TokenType.NUMBER, 0)
advance = False
else:
raise ValueError("A 0 must be followed by a '.' or a 'e'. Got '{0}'".format(char))
elif state == State.INTEGER_SIGN:
if char == "0":
next_state = State.INTEGER_0
add_char = True
elif char in "123456789":
next_state = State.INTEGER
add_char = True
else:
raise ValueError("A - must be followed by a digit. Got '{0}'".format(char))
elif state == State.INTEGER_EXP_0:
if char == "+" or char == "-" or char in "0123456789":
next_state = State.INTEGER_EXP
add_char = True
else:
raise ValueError("An e in a number must be followed by a '+', '-' or digit. Got '{0}'".format(char))
elif state == State.INTEGER_EXP:
if char in "0123456789":
add_char = True
elif is_delimiter(char):
completed = True
now_token = (TokenType.NUMBER, float("".join(token)))
next_state = State.WHITESPACE
advance = False
else:
raise ValueError("A number exponent must consist only of digits. Got '{}'".format(char))
elif state == State.FLOATING_POINT:
if char in "0123456789":
add_char = True
elif char == "e" or char == "E":
next_state = State.INTEGER_EXP_0
add_char = True
elif is_delimiter(char):
completed = True
now_token = (TokenType.NUMBER, float("".join(token)))
next_state = State.WHITESPACE
advance = False
else:
raise ValueError("A number must include only digits")
elif state == State.FLOATING_POINT_0:
if char in "0123456789":
next_state = State.FLOATING_POINT
add_char = True
else:
raise ValueError("A number with a decimal point must be followed by a fractional part")
elif state == State.FALSE_1:
if char == "a":
next_state = State.FALSE_2
else:
raise ValueError("Invalid JSON character: '{0}'".format(char))
elif state == State.FALSE_2:
if char == "l":
next_state = State.FALSE_3
else:
raise ValueError("Invalid JSON character: '{0}'".format(char))
elif state == State.FALSE_3:
if char == "s":
next_state = State.FALSE_4
else:
raise ValueError("Invalid JSON character: '{0}'".format(char))
elif state == State.FALSE_4:
if char == "e":
next_state = State.WHITESPACE
completed = True
now_token = (TokenType.BOOLEAN, False)
else:
raise ValueError("Invalid JSON character: '{0}'".format(char))
elif state == State.TRUE_1:
if char == "r":
next_state = State.TRUE_2
else:
raise ValueError("Invalid JSON character: '{0}'".format(char))
elif state == State.TRUE_2:
if char == "u":
next_state = State.TRUE_3
else:
raise ValueError("Invalid JSON character: '{0}'".format(char))
elif state == State.TRUE_3:
if char == "e":
next_state = State.WHITESPACE
completed = True
now_token = (TokenType.BOOLEAN, True)
else:
raise ValueError("Invalid JSON character: '{0}'".format(char))
elif state == State.NULL_1:
if char == "u":
next_state = State.NULL_2
else:
raise ValueError("Invalid JSON character: '{0}'".format(char))
elif state == State.NULL_2:
if char == "l":
next_state = State.NULL_3
else:
raise ValueError("Invalid JSON character: '{0}'".format(char))
elif state == State.NULL_3:
if char == "l":
next_state = State.WHITESPACE
completed = True
now_token = (TokenType.NULL, None)
else:
raise ValueError("Invalid JSON character: '{0}'".format(char))
elif state == State.STRING:
if char == "\"":
completed = True
now_token = (TokenType.STRING, "".join(token))
next_state = State.STRING_END
elif char == "\\":
next_state = State.STRING_ESCAPE
else:
add_char = True
elif state == State.STRING_END:
if is_delimiter(char):
advance = False
next_state = State.WHITESPACE
else:
raise ValueError("Expected whitespace or an operator after strin. Got '{}'".format(char))
elif state == State.STRING_ESCAPE:
next_state = State.STRING
if char == "\\" or char == "\"":
add_char = True
elif char == "b":
char = "\b"
add_char = True
elif char == "f":
char = "\f"
add_char = True
elif char == "n":
char = "\n"
add_char = True
elif char == "t":
char = "\t"
add_char = True
elif char == "r":
char = "\r"
add_char = True
elif char == "/":
char = "/"
add_char = True
elif char == "u":
next_state = State.UNICODE_1
charcode = 0
else:
raise ValueError("Invalid string escape: {}".format(char))
elif state == State.UNICODE_1:
if char in "0123456789":
charcode = (ord(char) - 48) * 4096
elif char in "abcdef":
charcode = (ord(char) - 87) * 4096
elif char in "ABCDEF":
charcode = (ord(char) - 55) * 4096
else:
raise ValueError("Invalid character code: {}".format(char))
next_state = State.UNICODE_2
char = ""
elif state == State.UNICODE_2:
if char in "0123456789":
charcode += (ord(char) - 48) * 256
elif char in "abcdef":
charcode += (ord(char) - 87) * 256
elif char in "ABCDEF":
charcode += (ord(char) - 55) * 256
else:
raise ValueError("Invalid character code: {}".format(char))
next_state = State.UNICODE_3
char = ""
elif state == State.UNICODE_3:
if char in "0123456789":
charcode += (ord(char) - 48) * 16
elif char in "abcdef":
charcode += (ord(char) - 87) * 16
elif char in "ABCDEF":
charcode += (ord(char) - 55) * 16
else:
raise ValueError("Invalid character code: {}".format(char))
next_state = State.UNICODE_4
char = ""
elif state == State.UNICODE_4:
if char in "0123456789":
charcode += ord(char) - 48
elif char in "abcdef":
charcode += ord(char) - 87
elif char in "ABCDEF":
charcode += ord(char) - 55
else:
raise ValueError("Invalid character code: {}".format(char))
next_state = State.STRING
char = chr(charcode)
add_char = True
if add_char:
token.append(char)
return advance, next_state, charcode
state = State.WHITESPACE
char = stream.read(1)
index = 0
while char:
try:
advance, state, charcode = process_char(char, charcode)
except ValueError as e:
raise ValueError("".join([e.args[0], " at index {}".format(index)]))
if completed:
completed = False
token = []
yield now_token
if advance:
char = stream.read(1)
index += 1
process_char(" ", charcode)
if completed:
yield now_token

View file

@ -0,0 +1,26 @@
from naya import tokenize
from json_stream.base import StreamingJSONBase, StreamingJSONList, StreamingJSONObject
def _visit(obj, visitor, path):
k = None
if isinstance(obj, StreamingJSONObject):
for k, v in obj.items():
_visit(v, visitor, path + (k,))
if k is None:
visitor({}, path)
elif isinstance(obj, StreamingJSONList):
for k, v in enumerate(obj):
_visit(v, visitor, path + (k,))
if k is None:
visitor([], path)
else:
visitor(obj, path)
def visit(fp, visitor):
token_stream = tokenize(fp)
_, token = next(token_stream)
obj = StreamingJSONBase.factory(token, token_stream, persistent=True)
_visit(obj, visitor, ())