Search code examples
pythongoogle-cloud-platformcloud-document-ai

How to process a single GCS-stored file on Document AI with the Python client?


I have been testing out the Google Document AI Python client, but I couldn't get the process_document() function working when trying to process one single document stored on Google Cloud Storage.

What I currently have:

  • A working quick start example, which processes a single locally stored document
  • A working Batch Processing example, which processes multiple GCS stored documents

Other options I've tried:

  • I have tried modifying the quick start example, but have been unable to get it to work for GCS stored files (more details below)
  • I can use the batch processing code for a single file. However, using batch processing appears to significantly slow down the processing speed compared to uploading local files
  • Another option I have considered is downloading my GCS-based file and re-uploading, but it seems to be both a waste of bandwidth and inelegant

Which brings me to the question, how do you process a single GCS-stored file with the Python client?


I have tried modifying the quick start example by replacing the class RawDocument with other classes that takes in GCS URI as input, but it didn't work:

  • The class GcsDocument isn't accepted by ProcessRequest to begin with.
    Trying to pass GcsDocument to either the raw_document or inline_document attribute anyway will raise the error:
    TypeError: Message must be initialized with a dict: google.cloud.documentai.v1.ProcessRequest

  • The class Document appears to be usable with ProcessRequest(inline_document=Document()). However, even following the example provided by process_document() will then raise the error:
    400 Only content payload is supported for Sync Process.

Here is a code snippet that will raise this error:

from google.api_core.client_options import ClientOptions
from google.cloud import documentai

opts = ClientOptions(api_endpoint=f"us-documentai.googleapis.com")
client = documentai.DocumentProcessorServiceClient(client_options=opts)
name = client.processor_path("my_project_id", "us", "my_processor_id")

gcs_document = documentai.Document(
    uri = "gs://my_image.jpeg",
    mime_type = "image/jpeg"
)
request = documentai.ProcessRequest(
    name = name,
    inline_document = gcs_document
)

# 400 Only content payload is supported for Sync Process.
result = client.process_document(request=request)

Solution

  • You cannot use Online Processing process_document() for files in Google Cloud Storage without downloading them locally.

    Update: Online Processing functionality has been updated to allow processing of a single file in Google Cloud storage. You can add a GCS document link using the gcs_document parameter in the processing request

    You can use Batch Processing batch_process_documents() to process a single document from Google Cloud Storage. It functions exactly the same as processing multiple documents with batch processing, but you just input a single document instead of a prefix or multiple documents. The input and output are both in Google Cloud Storage, and the processing is asynchronous.

    The Python code sample in Send a processing request > Batch processing shows exactly what you are looking for. Look for #######

    import re
    from typing import Optional
    
    from google.api_core.client_options import ClientOptions
    from google.api_core.exceptions import InternalServerError
    from google.api_core.exceptions import RetryError
    from google.cloud import documentai  # type: ignore
    from google.cloud import storage
    
    # TODO(developer): Uncomment these variables before running the sample.
    # project_id = "YOUR_PROJECT_ID"
    # location = "YOUR_PROCESSOR_LOCATION" # Format is "us" or "eu"
    # processor_id = "YOUR_PROCESSOR_ID" # Create processor before running sample
    # gcs_output_uri = "YOUR_OUTPUT_URI" # Must end with a trailing slash `/`. Format: gs://bucket/directory/subdirectory/
    # processor_version_id = "YOUR_PROCESSOR_VERSION_ID" # Optional. Example: pretrained-ocr-v1.0-2020-09-23
    
    # TODO(developer): You must specify either `gcs_input_uri` and `mime_type` or `gcs_input_prefix`
    # gcs_input_uri = "YOUR_INPUT_URI" # Format: gs://bucket/directory/file.pdf
    # input_mime_type = "application/pdf"
    # gcs_input_prefix = "YOUR_INPUT_URI_PREFIX" # Format: gs://bucket/directory/
    # field_mask = "text,entities,pages.pageNumber"  # Optional. The fields to return in the Document object.
    
    
    def batch_process_documents(
        project_id: str,
        location: str,
        processor_id: str,
        gcs_output_uri: str,
        processor_version_id: Optional[str] = None,
        gcs_input_uri: Optional[str] = None,
        input_mime_type: Optional[str] = None,
        gcs_input_prefix: Optional[str] = None,
        field_mask: Optional[str] = None,
        timeout: int = 400,
    ) -> None:
        # You must set the `api_endpoint` if you use a location other than "us".
        opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")
    
        client = documentai.DocumentProcessorServiceClient(client_options=opts)
    
        if gcs_input_uri:
            # Specify specific GCS URIs to process individual documents
            gcs_document = documentai.GcsDocument(
                gcs_uri=gcs_input_uri, mime_type=input_mime_type
            )
            ##############
            # This line is where the single document is being added
            ##############
            gcs_documents = documentai.GcsDocuments(documents=[gcs_document])
            input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents)
        else:
            # Specify a GCS URI Prefix to process an entire directory
            gcs_prefix = documentai.GcsPrefix(gcs_uri_prefix=gcs_input_prefix)
            input_config = documentai.BatchDocumentsInputConfig(gcs_prefix=gcs_prefix)
    
        # Cloud Storage URI for the Output Directory
        gcs_output_config = documentai.DocumentOutputConfig.GcsOutputConfig(
            gcs_uri=gcs_output_uri, field_mask=field_mask
        )
    
        # Where to write results
        output_config = documentai.DocumentOutputConfig(gcs_output_config=gcs_output_config)
    
        if processor_version_id:
            # The full resource name of the processor version, e.g.:
            # projects/{project_id}/locations/{location}/processors/{processor_id}/processorVersions/{processor_version_id}
            name = client.processor_version_path(
                project_id, location, processor_id, processor_version_id
            )
        else:
            # The full resource name of the processor, e.g.:
            # projects/{project_id}/locations/{location}/processors/{processor_id}
            name = client.processor_path(project_id, location, processor_id)
    
        request = documentai.BatchProcessRequest(
            name=name,
            input_documents=input_config,
            document_output_config=output_config,
        )
    
        # BatchProcess returns a Long Running Operation (LRO)
        operation = client.batch_process_documents(request)
    
        # Continually polls the operation until it is complete.
        # This could take some time for larger files
        # Format: projects/{project_id}/locations/{location}/operations/{operation_id}
        try:
            print(f"Waiting for operation {operation.operation.name} to complete...")
            operation.result(timeout=timeout)
        # Catch exception when operation doesn't finish before timeout
        except (RetryError, InternalServerError) as e:
            print(e.message)
    
        # NOTE: Can also use callbacks for asynchronous processing
        #
        # def my_callback(future):
        #   result = future.result()
        #
        # operation.add_done_callback(my_callback)
    
        # Once the operation is complete,
        # get output document information from operation metadata
        metadata = documentai.BatchProcessMetadata(operation.metadata)
    
        if metadata.state != documentai.BatchProcessMetadata.State.SUCCEEDED:
            raise ValueError(f"Batch Process Failed: {metadata.state_message}")
    
        storage_client = storage.Client()
    
        print("Output files:")
        # One process per Input Document
        for process in list(metadata.individual_process_statuses):
            # output_gcs_destination format: gs://BUCKET/PREFIX/OPERATION_NUMBER/INPUT_FILE_NUMBER/
            # The Cloud Storage API requires the bucket name and URI prefix separately
            matches = re.match(r"gs://(.*?)/(.*)", process.output_gcs_destination)
            if not matches:
                print(
                    "Could not parse output GCS destination:",
                    process.output_gcs_destination,
                )
                continue
    
            output_bucket, output_prefix = matches.groups()
    
            # Get List of Document Objects from the Output Bucket
            output_blobs = storage_client.list_blobs(output_bucket, prefix=output_prefix)
    
            # Document AI may output multiple JSON files per source file
            for blob in output_blobs:
                # Document AI should only output JSON files to GCS
                if blob.content_type != "application/json":
                    print(
                        f"Skipping non-supported file: {blob.name} - Mimetype: {blob.content_type}"
                    )
                    continue
    
                # Download JSON File as bytes object and convert to Document Object
                print(f"Fetching {blob.name}")
                document = documentai.Document.from_json(
                    blob.download_as_bytes(), ignore_unknown_fields=True
                )
    
                # For a full list of Document object attributes, please reference this page:
                # https://cloud.google.com/python/docs/reference/documentai/latest/google.cloud.documentai_v1.types.Document
    
                # Read the text recognition output from the processor
                print("The document contains the following text:")
                print(document.text)