I'm trying to use intake
to create a data catalogue for a JSON file. #197 mentions "Essentially, you need to provide the reader function json.loads
, if each of your files is a single JSON block which evaluates to a list of objects."
I created a test.json
{
"test": "test"
}
and (copying Data Engineering with Intake) tried
import json
import intake
source = intake.open_textfiles("test.json", decoder=json.loads)
print(source.yaml())
saved the output to source.yaml
sources:
textfiles:
args:
decoder: !!python/name:json.loads ''
urlpath: test.json
description: ''
driver: intake.source.textfiles.TextFilesSource
metadata: {}
and tried opening it
cat = intake.open_catalog('source.yaml')
which produced:
---------------------------------------------------------------------------
ConstructorError Traceback (most recent call last)
<ipython-input-55-9b8e3a51ebc2> in <module>()
----> 1 cat = intake.open_catalog('source.yaml')
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/intake/__init__.py in open_catalog(uri, **kwargs)
160 raise ValueError('Unknown catalog driver (%s), supply one of: %s'
161 % (driver, list(sorted(registry))))
--> 162 return registry[driver](uri, **kwargs)
163
164
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/intake/catalog/local.py in __init__(self, path, autoreload, **kwargs)
550 self.autoreload = autoreload # set this to False if don't want reloads
551 self.filesystem = kwargs.pop('fs', None)
--> 552 super(YAMLFileCatalog, self).__init__(**kwargs)
553
554 def _load(self, reload=False):
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/intake/catalog/base.py in __init__(self, name, description, metadata, auth, ttl, getenv, getshell, persist_mode, storage_options, *args)
111 self.updated = time.time()
112 self._entries = self._make_entries_container()
--> 113 self.force_reload()
114
115 @classmethod
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/intake/catalog/base.py in force_reload(self)
168 def force_reload(self):
169 """Imperative reload data now"""
--> 170 self._load()
171 self.updated = time.time()
172
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/intake/catalog/local.py in _load(self, reload)
580 logger.warning("Use of '!template' deprecated - fixing")
581 text = text.replace('!template ', '')
--> 582 self.parse(text)
583
584 def add(self, source, name=None, path=None, storage_options=None):
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/intake/catalog/local.py in parse(self, text)
649 """
650 self.text = text
--> 651 data = yaml_load(self.text)
652
653 if data is None:
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/intake/utils.py in yaml_load(stream)
73 """Parse YAML in a context where duplicate keys raise exception"""
74 with no_duplicate_yaml():
---> 75 return yaml.safe_load(stream)
76
77
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/yaml/__init__.py in safe_load(stream)
160 to be safe for untrusted input.
161 """
--> 162 return load(stream, SafeLoader)
163
164 def safe_load_all(stream):
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/yaml/__init__.py in load(stream, Loader)
112 loader = Loader(stream)
113 try:
--> 114 return loader.get_single_data()
115 finally:
116 loader.dispose()
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/yaml/constructor.py in get_single_data(self)
49 node = self.get_single_node()
50 if node is not None:
---> 51 return self.construct_document(node)
52 return None
53
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/yaml/constructor.py in construct_document(self, node)
53
54 def construct_document(self, node):
---> 55 data = self.construct_object(node)
56 while self.state_generators:
57 state_generators = self.state_generators
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/yaml/constructor.py in construct_object(self, node, deep)
98 constructor = self.__class__.construct_mapping
99 if tag_suffix is None:
--> 100 data = constructor(self, node)
101 else:
102 data = constructor(self, tag_suffix, node)
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/intake/utils.py in no_duplicates_constructor(loader, node, deep)
30 for key_node, value_node in node.value:
31 key = loader.construct_object(key_node, deep=deep)
---> 32 value = loader.construct_object(value_node, deep=deep)
33 if key in mapping:
34 from intake.catalog.exceptions import DuplicateKeyError
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/yaml/constructor.py in construct_object(self, node, deep)
98 constructor = self.__class__.construct_mapping
99 if tag_suffix is None:
--> 100 data = constructor(self, node)
101 else:
102 data = constructor(self, tag_suffix, node)
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/intake/utils.py in no_duplicates_constructor(loader, node, deep)
30 for key_node, value_node in node.value:
31 key = loader.construct_object(key_node, deep=deep)
---> 32 value = loader.construct_object(value_node, deep=deep)
33 if key in mapping:
34 from intake.catalog.exceptions import DuplicateKeyError
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/yaml/constructor.py in construct_object(self, node, deep)
98 constructor = self.__class__.construct_mapping
99 if tag_suffix is None:
--> 100 data = constructor(self, node)
101 else:
102 data = constructor(self, tag_suffix, node)
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/intake/utils.py in no_duplicates_constructor(loader, node, deep)
30 for key_node, value_node in node.value:
31 key = loader.construct_object(key_node, deep=deep)
---> 32 value = loader.construct_object(value_node, deep=deep)
33 if key in mapping:
34 from intake.catalog.exceptions import DuplicateKeyError
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/yaml/constructor.py in construct_object(self, node, deep)
98 constructor = self.__class__.construct_mapping
99 if tag_suffix is None:
--> 100 data = constructor(self, node)
101 else:
102 data = constructor(self, tag_suffix, node)
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/intake/utils.py in no_duplicates_constructor(loader, node, deep)
30 for key_node, value_node in node.value:
31 key = loader.construct_object(key_node, deep=deep)
---> 32 value = loader.construct_object(value_node, deep=deep)
33 if key in mapping:
34 from intake.catalog.exceptions import DuplicateKeyError
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/yaml/constructor.py in construct_object(self, node, deep)
98 constructor = self.__class__.construct_mapping
99 if tag_suffix is None:
--> 100 data = constructor(self, node)
101 else:
102 data = constructor(self, tag_suffix, node)
/home/wsl-rowanm/miniconda3/envs/ireland-smartmeterdata/lib/python3.7/site-packages/yaml/constructor.py in construct_undefined(self, node)
427 raise ConstructorError(None, None,
428 "could not determine a constructor for the tag %r" % node.tag,
--> 429 node.start_mark)
430
431 SafeConstructor.add_constructor(
ConstructorError: could not determine a constructor for the tag 'tag:yaml.org,2002:python/name:json.loads'
in "<unicode string>", line 4, column 16:
decoder: !!python/name:json.loads ''
I'm not sure how to resolve this and would really appreciate any tips!
Unfortunately, that example is out of date. The reason is, that we decided that YAML files should be loaded via the safe mode, so that contained references to python objects are not executed. This means that you can always load an unknown catalogue and examine it, before deciding if you want to access any of its contents and possibly executing code.
As things stand, your workaround would be to get the data into memory and then decode it by hand, e.g.,
cat = intake.open_catalog('source.yaml')
cat.textfiles.to_dask.map(json.loads). # via dask
[json.loads(obj) for obj in cat.textfiles.read()] # straight python
You could lodge an issue with Intake suggesting that one should be able to define a post-load processing step in textfiles using the fully-qualified name of the function ("json.loads" in this case) instead of a function object.
Also, we are planning to introduce a "derived datasource" in Intake which will more generally apply a function-and-arguments to any other data source, and that could be used for your case too - but it's not yet implemented.