I have been testing out the Google Document AI Python client,
but I couldn't get the process_document()
function working when trying to process one single document stored on Google Cloud Storage.
What I currently have:
Other options I've tried:
Which brings me to the question, how do you process a single GCS-stored file with the Python client?
I have tried modifying the quick start example by replacing the class
RawDocument
with other classes that takes in GCS URI as input, but it didn't work:
The class GcsDocument
isn't accepted by ProcessRequest
to begin with.
Trying to pass GcsDocument
to either the raw_document
or inline_document
attribute anyway will raise the error:
TypeError: Message must be initialized with a dict: google.cloud.documentai.v1.ProcessRequest
The class Document
appears to be usable with ProcessRequest(inline_document=Document())
.
However, even following the example provided by
process_document()
will then raise the error:
400 Only content payload is supported for Sync Process.
Here is a code snippet that will raise this error:
from google.api_core.client_options import ClientOptions
from google.cloud import documentai
opts = ClientOptions(api_endpoint=f"us-documentai.googleapis.com")
client = documentai.DocumentProcessorServiceClient(client_options=opts)
name = client.processor_path("my_project_id", "us", "my_processor_id")
gcs_document = documentai.Document(
uri = "gs://my_image.jpeg",
mime_type = "image/jpeg"
)
request = documentai.ProcessRequest(
name = name,
inline_document = gcs_document
)
# 400 Only content payload is supported for Sync Process.
result = client.process_document(request=request)
You cannot use Online Processing process_document()
for files in Google Cloud Storage without downloading them locally.
Update: Online Processing functionality has been updated to allow processing of a single file in Google Cloud storage. You can add a GCS document link using the gcs_document
parameter in the processing request
You can use Batch Processing batch_process_documents()
to process a single document from Google Cloud Storage. It functions exactly the same as processing multiple documents with batch processing, but you just input a single document instead of a prefix or multiple documents. The input and output are both in Google Cloud Storage, and the processing is asynchronous.
The Python code sample in Send a processing request > Batch processing shows exactly what you are looking for. Look for #######
import re
from typing import Optional
from google.api_core.client_options import ClientOptions
from google.api_core.exceptions import InternalServerError
from google.api_core.exceptions import RetryError
from google.cloud import documentai # type: ignore
from google.cloud import storage
# TODO(developer): Uncomment these variables before running the sample.
# project_id = "YOUR_PROJECT_ID"
# location = "YOUR_PROCESSOR_LOCATION" # Format is "us" or "eu"
# processor_id = "YOUR_PROCESSOR_ID" # Create processor before running sample
# gcs_output_uri = "YOUR_OUTPUT_URI" # Must end with a trailing slash `/`. Format: gs://bucket/directory/subdirectory/
# processor_version_id = "YOUR_PROCESSOR_VERSION_ID" # Optional. Example: pretrained-ocr-v1.0-2020-09-23
# TODO(developer): You must specify either `gcs_input_uri` and `mime_type` or `gcs_input_prefix`
# gcs_input_uri = "YOUR_INPUT_URI" # Format: gs://bucket/directory/file.pdf
# input_mime_type = "application/pdf"
# gcs_input_prefix = "YOUR_INPUT_URI_PREFIX" # Format: gs://bucket/directory/
# field_mask = "text,entities,pages.pageNumber" # Optional. The fields to return in the Document object.
def batch_process_documents(
project_id: str,
location: str,
processor_id: str,
gcs_output_uri: str,
processor_version_id: Optional[str] = None,
gcs_input_uri: Optional[str] = None,
input_mime_type: Optional[str] = None,
gcs_input_prefix: Optional[str] = None,
field_mask: Optional[str] = None,
timeout: int = 400,
) -> None:
# You must set the `api_endpoint` if you use a location other than "us".
opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")
client = documentai.DocumentProcessorServiceClient(client_options=opts)
if gcs_input_uri:
# Specify specific GCS URIs to process individual documents
gcs_document = documentai.GcsDocument(
gcs_uri=gcs_input_uri, mime_type=input_mime_type
)
##############
# This line is where the single document is being added
##############
gcs_documents = documentai.GcsDocuments(documents=[gcs_document])
input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents)
else:
# Specify a GCS URI Prefix to process an entire directory
gcs_prefix = documentai.GcsPrefix(gcs_uri_prefix=gcs_input_prefix)
input_config = documentai.BatchDocumentsInputConfig(gcs_prefix=gcs_prefix)
# Cloud Storage URI for the Output Directory
gcs_output_config = documentai.DocumentOutputConfig.GcsOutputConfig(
gcs_uri=gcs_output_uri, field_mask=field_mask
)
# Where to write results
output_config = documentai.DocumentOutputConfig(gcs_output_config=gcs_output_config)
if processor_version_id:
# The full resource name of the processor version, e.g.:
# projects/{project_id}/locations/{location}/processors/{processor_id}/processorVersions/{processor_version_id}
name = client.processor_version_path(
project_id, location, processor_id, processor_version_id
)
else:
# The full resource name of the processor, e.g.:
# projects/{project_id}/locations/{location}/processors/{processor_id}
name = client.processor_path(project_id, location, processor_id)
request = documentai.BatchProcessRequest(
name=name,
input_documents=input_config,
document_output_config=output_config,
)
# BatchProcess returns a Long Running Operation (LRO)
operation = client.batch_process_documents(request)
# Continually polls the operation until it is complete.
# This could take some time for larger files
# Format: projects/{project_id}/locations/{location}/operations/{operation_id}
try:
print(f"Waiting for operation {operation.operation.name} to complete...")
operation.result(timeout=timeout)
# Catch exception when operation doesn't finish before timeout
except (RetryError, InternalServerError) as e:
print(e.message)
# NOTE: Can also use callbacks for asynchronous processing
#
# def my_callback(future):
# result = future.result()
#
# operation.add_done_callback(my_callback)
# Once the operation is complete,
# get output document information from operation metadata
metadata = documentai.BatchProcessMetadata(operation.metadata)
if metadata.state != documentai.BatchProcessMetadata.State.SUCCEEDED:
raise ValueError(f"Batch Process Failed: {metadata.state_message}")
storage_client = storage.Client()
print("Output files:")
# One process per Input Document
for process in list(metadata.individual_process_statuses):
# output_gcs_destination format: gs://BUCKET/PREFIX/OPERATION_NUMBER/INPUT_FILE_NUMBER/
# The Cloud Storage API requires the bucket name and URI prefix separately
matches = re.match(r"gs://(.*?)/(.*)", process.output_gcs_destination)
if not matches:
print(
"Could not parse output GCS destination:",
process.output_gcs_destination,
)
continue
output_bucket, output_prefix = matches.groups()
# Get List of Document Objects from the Output Bucket
output_blobs = storage_client.list_blobs(output_bucket, prefix=output_prefix)
# Document AI may output multiple JSON files per source file
for blob in output_blobs:
# Document AI should only output JSON files to GCS
if blob.content_type != "application/json":
print(
f"Skipping non-supported file: {blob.name} - Mimetype: {blob.content_type}"
)
continue
# Download JSON File as bytes object and convert to Document Object
print(f"Fetching {blob.name}")
document = documentai.Document.from_json(
blob.download_as_bytes(), ignore_unknown_fields=True
)
# For a full list of Document object attributes, please reference this page:
# https://cloud.google.com/python/docs/reference/documentai/latest/google.cloud.documentai_v1.types.Document
# Read the text recognition output from the processor
print("The document contains the following text:")
print(document.text)