I have some images in azure container as blobs, I wanted to detect if there are any faces in the images using faceapi and create a Person Group with different persons in python. I do not want to download images into local directory. Using the following code I was able to run read an image from a blob but the faceapi is not accepting that as input.
from azure.storage.blob import BlobClient
blob = BlobClient(account_url="https://****.blob.core.windows.net/",
container_name="demo",credential="**" )
data = blob.download_blob()
Let me know if there is a way I will be able to do this.
According to the code you provide, when you create a BlobClient
, you do not provide blobName. Please update the code as BlobClient(account_url="", container_name="", blob_name="", credential="account key")
For example
from azure.cognitiveservices.vision.face import FaceClient
from msrest.authentication import CognitiveServicesCredentials
from azure.cognitiveservices.vision.face.models import TrainingStatusType, Person
import os
import io
import json
import uuid
import time
import sys
from azure.storage.blob import BlobClient
account_key = ''
blob = BlobClient(account_url="https://*.blob.core.windows.net/",
container_name="image",
blob_name='my.jpg',
credential=account_key)
image = blob.download_blob()
endpoint = 'https://*.cognitiveservices.azure.com/'
key = ''
face_client = FaceClient(endpoint, CognitiveServicesCredentials(key))
PERSON_GROUP_ID = str(uuid.uuid4())
face_client.person_group.create(
person_group_id=PERSON_GROUP_ID, name=PERSON_GROUP_ID)
woman = face_client.person_group_person.create(PERSON_GROUP_ID, "Woman")
face_client.person_group_person.add_face_from_stream(
PERSON_GROUP_ID, woman.person_id, io.BytesIO(image.content_as_bytes(max_concurrency=3)))
face_client.person_group.train(PERSON_GROUP_ID)
while (True):
training_status = face_client.person_group.get_training_status(
PERSON_GROUP_ID)
print("Training status: {}.".format(training_status.status))
print()
if (training_status.status is TrainingStatusType.succeeded):
break
elif (training_status.status is TrainingStatusType.failed):
sys.exit('Training the person group has failed.')
time.sleep(5)