mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-06-05 14:55:18 +02:00
action json
This commit is contained in:
parent
c2ad5a5036
commit
1799993447
8 changed files with 384 additions and 18 deletions
230
metagpt/utils/custom_decoder.py
Normal file
230
metagpt/utils/custom_decoder.py
Normal file
|
|
@ -0,0 +1,230 @@
|
|||
import json
|
||||
import re
|
||||
from json import scanner, JSONDecodeError
|
||||
from json.decoder import _decode_uXXXX
|
||||
NUMBER_RE = re.compile(
|
||||
r'(-?(?:0|[1-9]\d*))(\.\d+)?([eE][-+]?\d+)?',
|
||||
(re.VERBOSE | re.MULTILINE | re.DOTALL))
|
||||
def py_make_scanner(context):
|
||||
parse_object = context.parse_object
|
||||
parse_array = context.parse_array
|
||||
parse_string = context.parse_string
|
||||
match_number = NUMBER_RE.match
|
||||
strict = context.strict
|
||||
parse_float = context.parse_float
|
||||
parse_int = context.parse_int
|
||||
parse_constant = context.parse_constant
|
||||
object_hook = context.object_hook
|
||||
object_pairs_hook = context.object_pairs_hook
|
||||
memo = context.memo
|
||||
|
||||
def _scan_once(string, idx):
|
||||
try:
|
||||
nextchar = string[idx]
|
||||
except IndexError:
|
||||
raise StopIteration(idx) from None
|
||||
|
||||
if nextchar == '"' or nextchar == "'":
|
||||
return parse_string(string, idx + 1, strict, delimiter=nextchar)
|
||||
elif nextchar == '{':
|
||||
return parse_object((string, idx + 1), strict,
|
||||
_scan_once, object_hook, object_pairs_hook, memo)
|
||||
elif nextchar == '[':
|
||||
return parse_array((string, idx + 1), _scan_once)
|
||||
elif nextchar == 'n' and string[idx:idx + 4] == 'null':
|
||||
return None, idx + 4
|
||||
elif nextchar == 't' and string[idx:idx + 4] == 'true':
|
||||
return True, idx + 4
|
||||
elif nextchar == 'f' and string[idx:idx + 5] == 'false':
|
||||
return False, idx + 5
|
||||
|
||||
m = match_number(string, idx)
|
||||
if m is not None:
|
||||
integer, frac, exp = m.groups()
|
||||
if frac or exp:
|
||||
res = parse_float(integer + (frac or '') + (exp or ''))
|
||||
else:
|
||||
res = parse_int(integer)
|
||||
return res, m.end()
|
||||
elif nextchar == 'N' and string[idx:idx + 3] == 'NaN':
|
||||
return parse_constant('NaN'), idx + 3
|
||||
elif nextchar == 'I' and string[idx:idx + 8] == 'Infinity':
|
||||
return parse_constant('Infinity'), idx + 8
|
||||
elif nextchar == '-' and string[idx:idx + 9] == '-Infinity':
|
||||
return parse_constant('-Infinity'), idx + 9
|
||||
else:
|
||||
raise StopIteration(idx)
|
||||
|
||||
def scan_once(string, idx):
|
||||
try:
|
||||
return _scan_once(string, idx)
|
||||
finally:
|
||||
memo.clear()
|
||||
|
||||
return scan_once
|
||||
FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL
|
||||
STRINGCHUNK = re.compile(r'(.*?)(["\\\x00-\x1f])', FLAGS)
|
||||
STRINGCHUNK_SINGLEQUOTE = re.compile(r'(.*?)([\'\\\x00-\x1f])', FLAGS)
|
||||
BACKSLASH = {
|
||||
'"': '"', '\\': '\\', '/': '/',
|
||||
'b': '\b', 'f': '\f', 'n': '\n', 'r': '\r', 't': '\t',
|
||||
}
|
||||
WHITESPACE = re.compile(r'[ \t\n\r]*', FLAGS)
|
||||
WHITESPACE_STR = ' \t\n\r'
|
||||
def JSONObject(s_and_end, strict, scan_once, object_hook, object_pairs_hook,
|
||||
memo=None, _w=WHITESPACE.match, _ws=WHITESPACE_STR):
|
||||
s, end = s_and_end
|
||||
pairs = []
|
||||
pairs_append = pairs.append
|
||||
# Backwards compatibility
|
||||
if memo is None:
|
||||
memo = {}
|
||||
memo_get = memo.setdefault
|
||||
# Use a slice to prevent IndexError from being raised, the following
|
||||
# check will raise a more specific ValueError if the string is empty
|
||||
nextchar = s[end:end + 1]
|
||||
# Normally we expect nextchar == '"'
|
||||
if nextchar != '"' and nextchar != "'":
|
||||
if nextchar in _ws:
|
||||
end = _w(s, end).end()
|
||||
nextchar = s[end:end + 1]
|
||||
# Trivial empty object
|
||||
if nextchar == '}':
|
||||
if object_pairs_hook is not None:
|
||||
result = object_pairs_hook(pairs)
|
||||
return result, end + 1
|
||||
pairs = {}
|
||||
if object_hook is not None:
|
||||
pairs = object_hook(pairs)
|
||||
return pairs, end + 1
|
||||
elif nextchar != '"':
|
||||
raise JSONDecodeError(
|
||||
"Expecting property name enclosed in double quotes", s, end)
|
||||
end += 1
|
||||
while True:
|
||||
key, end = scanstring(s, end, strict, delimiter=nextchar)
|
||||
key = memo_get(key, key)
|
||||
# To skip some function call overhead we optimize the fast paths where
|
||||
# the JSON key separator is ": " or just ":".
|
||||
if s[end:end + 1] != ':':
|
||||
end = _w(s, end).end()
|
||||
if s[end:end + 1] != ':':
|
||||
raise JSONDecodeError("Expecting ':' delimiter", s, end)
|
||||
end += 1
|
||||
|
||||
try:
|
||||
if s[end] in _ws:
|
||||
end += 1
|
||||
if s[end] in _ws:
|
||||
end = _w(s, end + 1).end()
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
try:
|
||||
value, end = scan_once(s, end)
|
||||
except StopIteration as err:
|
||||
raise JSONDecodeError("Expecting value", s, err.value) from None
|
||||
pairs_append((key, value))
|
||||
try:
|
||||
nextchar = s[end]
|
||||
if nextchar in _ws:
|
||||
end = _w(s, end + 1).end()
|
||||
nextchar = s[end]
|
||||
except IndexError:
|
||||
nextchar = ''
|
||||
end += 1
|
||||
|
||||
if nextchar == '}':
|
||||
break
|
||||
elif nextchar != ',':
|
||||
raise JSONDecodeError("Expecting ',' delimiter", s, end - 1)
|
||||
end = _w(s, end).end()
|
||||
nextchar = s[end:end + 1]
|
||||
end += 1
|
||||
if nextchar != '"':
|
||||
raise JSONDecodeError(
|
||||
"Expecting property name enclosed in double quotes", s, end - 1)
|
||||
if object_pairs_hook is not None:
|
||||
result = object_pairs_hook(pairs)
|
||||
return result, end
|
||||
pairs = dict(pairs)
|
||||
if object_hook is not None:
|
||||
pairs = object_hook(pairs)
|
||||
return pairs, end
|
||||
def py_scanstring(s, end, strict=True,
|
||||
_b=BACKSLASH, _m=STRINGCHUNK.match,delimiter='"'):
|
||||
"""Scan the string s for a JSON string. End is the index of the
|
||||
character in s after the quote that started the JSON string.
|
||||
Unescapes all valid JSON string escape sequences and raises ValueError
|
||||
on attempt to decode an invalid string. If strict is False then literal
|
||||
control characters are allowed in the string.
|
||||
|
||||
Returns a tuple of the decoded string and the index of the character in s
|
||||
after the end quote."""
|
||||
chunks = []
|
||||
_append = chunks.append
|
||||
begin = end - 1
|
||||
if delimiter == '"':
|
||||
_m = STRINGCHUNK.match
|
||||
else:
|
||||
_m = STRINGCHUNK_SINGLEQUOTE.match
|
||||
while 1:
|
||||
chunk = _m(s, end)
|
||||
if chunk is None:
|
||||
raise JSONDecodeError("Unterminated string starting at", s, begin)
|
||||
end = chunk.end()
|
||||
content, terminator = chunk.groups()
|
||||
# Content is contains zero or more unescaped string characters
|
||||
if content:
|
||||
_append(content)
|
||||
# Terminator is the end of string, a literal control character,
|
||||
# or a backslash denoting that an escape sequence follows
|
||||
if terminator == delimiter:
|
||||
break
|
||||
elif terminator != '\\':
|
||||
if strict:
|
||||
#msg = "Invalid control character %r at" % (terminator,)
|
||||
msg = "Invalid control character {0!r} at".format(terminator)
|
||||
raise JSONDecodeError(msg, s, end)
|
||||
else:
|
||||
_append(terminator)
|
||||
continue
|
||||
try:
|
||||
esc = s[end]
|
||||
except IndexError:
|
||||
raise JSONDecodeError("Unterminated string starting at",
|
||||
s, begin) from None
|
||||
# If not a unicode escape sequence, must be in the lookup table
|
||||
if esc != 'u':
|
||||
try:
|
||||
char = _b[esc]
|
||||
except KeyError:
|
||||
msg = "Invalid \\escape: {0!r}".format(esc)
|
||||
raise JSONDecodeError(msg, s, end)
|
||||
end += 1
|
||||
else:
|
||||
uni = _decode_uXXXX(s, end)
|
||||
end += 5
|
||||
if 0xd800 <= uni <= 0xdbff and s[end:end + 2] == '\\u':
|
||||
uni2 = _decode_uXXXX(s, end + 1)
|
||||
if 0xdc00 <= uni2 <= 0xdfff:
|
||||
uni = 0x10000 + (((uni - 0xd800) << 10) | (uni2 - 0xdc00))
|
||||
end += 6
|
||||
char = chr(uni)
|
||||
_append(char)
|
||||
return ''.join(chunks), end
|
||||
scanstring = py_scanstring
|
||||
class CustomDecoder(json.JSONDecoder):
|
||||
def __init__(self, *, object_hook=None, parse_float=None,
|
||||
parse_int=None, parse_constant=None, strict=True,
|
||||
object_pairs_hook=None):
|
||||
super().__init__(object_hook=object_hook, parse_float=parse_float, parse_int=parse_int,
|
||||
parse_constant=parse_constant, strict=strict,object_pairs_hook=object_pairs_hook)
|
||||
self.parse_object = JSONObject
|
||||
self.parse_string = py_scanstring
|
||||
self.scan_once = py_make_scanner(self)
|
||||
|
||||
def decode(self, s, _w=json.decoder.WHITESPACE.match):
|
||||
|
||||
return super().decode(s)
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue