`
import apache_beam as beam
from apache_beam.utils import shared
from log_elements import LogElements
class GetNthStringFn(beam.DoFn):
def __init__(self, shared_handle):
self._shared_handle = shared_handle
def process(self, element):
def initialize_list():
# Build the giant initial list.
return [str(i) for i in range(1000000)]
giant_list = self._shared_handle.acquire(initialize_list)
yield giant_list[element]
with beam.Pipeline() as p:
shared_handle = shared.Shared()
(p | beam.Create([2, 4, 6, 8])
| beam.ParDo(GetNthStringFn(shared_handle))
| LogElements())
`
I tried example given in apache beam documentation. Link https://beam.apache.org/releases/pydoc/2.24.0/apache_beam.utils.shared.html
but got below error
File "/opt/playground/backend/executable_files/4a6babb9-9ea4-4a70-881d-559057592090/4a6babb9-9ea4-4a70-881d-559057592090.py", line 39, in process
giant_list = self._shared_handle.acquire(initialize_list)
File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/shared.py", line 312, in acquire
return _shared_map.acquire(self._key, constructor_fn, tag)
File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/shared.py", line 253, in acquire
result = control_block.acquire(constructor_fn, tag)
File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/shared.py", line 149, in acquire
self._ref = weakref.ref(result)
TypeError: cannot create weak reference to 'list' object [while running 'ParDo(GetNthStringFn)']
You don't reference the good link and version, the Beam
version 2.24.0
is too old.
Check with this code and this link :
# Several built-in types such as list and dict do not directly support weak
# references but can add support through subclassing:
# https://docs.python.org/3/library/weakref.html
class WeakRefList(list):
pass
class GetNthStringFn(beam.DoFn):
def __init__(self):
self._shared_handle = shared.Shared()
def setup(self):
# setup is a good place to initialize transient in-memory resources.
def initialize_list():
# Build the giant initial list.
return WeakRefList([str(i) for i in range(1000000)])
self._giant_list = self._shared_handle.acquire(initialize_list)
def process(self, element):
yield self._giant_list[element]
p = beam.Pipeline()
(p | beam.Create([2, 4, 6, 8])
| beam.ParDo(GetNthStringFn()))
To be able to execute this code, you need to install the Beam
2.42.0
version. You can install it with pip in your virtual env.