Search code examples
pythongoogle-cloud-dataflowapache-beam

I want to create lookup data using apache_beam.utils.shared module but it gives error TypeError: cannot create weak reference to 'list' object


`

import apache_beam as beam
from apache_beam.utils import shared
from log_elements import LogElements

class GetNthStringFn(beam.DoFn):
  def __init__(self, shared_handle):
    self._shared_handle = shared_handle

  def process(self, element):
    def initialize_list():
      # Build the giant initial list.
      return [str(i) for i in range(1000000)]

    giant_list = self._shared_handle.acquire(initialize_list)
    yield giant_list[element]

with beam.Pipeline() as p:
  shared_handle = shared.Shared()

  (p | beam.Create([2, 4, 6, 8])

     | beam.ParDo(GetNthStringFn(shared_handle))

     | LogElements())

`

I tried example given in apache beam documentation. Link https://beam.apache.org/releases/pydoc/2.24.0/apache_beam.utils.shared.html

but got below error

File "/opt/playground/backend/executable_files/4a6babb9-9ea4-4a70-881d-559057592090/4a6babb9-9ea4-4a70-881d-559057592090.py", line 39, in process
    giant_list = self._shared_handle.acquire(initialize_list)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/shared.py", line 312, in acquire
    return _shared_map.acquire(self._key, constructor_fn, tag)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/shared.py", line 253, in acquire
    result = control_block.acquire(constructor_fn, tag)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/shared.py", line 149, in acquire
    self._ref = weakref.ref(result)
TypeError: cannot create weak reference to 'list' object [while running 'ParDo(GetNthStringFn)']


Solution

  • You don't reference the good link and version, the Beam version 2.24.0 is too old.

    Check with this code and this link :

    # Several built-in types such as list and dict do not directly support weak
    # references but can add support through subclassing:
    # https://docs.python.org/3/library/weakref.html
    class WeakRefList(list):
      pass
    
    class GetNthStringFn(beam.DoFn):
      def __init__(self):
        self._shared_handle = shared.Shared()
    
      def setup(self):
        # setup is a good place to initialize transient in-memory resources.
        def initialize_list():
          # Build the giant initial list.
          return WeakRefList([str(i) for i in range(1000000)])
    
        self._giant_list = self._shared_handle.acquire(initialize_list)
    
      def process(self, element):
        yield self._giant_list[element]
    
    p = beam.Pipeline()
    (p | beam.Create([2, 4, 6, 8])
       | beam.ParDo(GetNthStringFn()))
    

    To be able to execute this code, you need to install the Beam 2.42.0 version. You can install it with pip in your virtual env.