This is not an official Google product. This is a tutorial aiming at giving you ideas...
Dear developers,
- Do you like the adage "a picture is worth a thousand words"? I do!
- Let's check if it also works for "a picture is worth a thousand frames".
- In this tutorial, you'll see the following:
- How to understand the content of a video in a blink,
- in less than 300 lines of Python (3.9) code.
Here is a visual summary example, generated from a 2'42" video made of 35 sequences (also known as video shots):
Note: The summary is a grid where each cell is a frame representing a video shot.
This tutorial has 2 objectives, 1 practical and 1 technical:
- Automatically generate visual summaries of videos
- Build a processing pipeline with these properties:
- managed (always ready and easy to set up)
- scalable (able to ingest several videos in parallel)
- not costing anything when not used
A few tools are enough:
- Storage space for videos and results
- A serverless solution to run the code
- A machine learning model to analyze videos
- A library to extract frames from videos
- A library to generate the visual summaries
Here is a possible architecture using 3 Google Cloud services (Cloud Storage, Cloud Functions, and Video Intelligence API):
The processing pipeline follows these steps:
- You upload a video to the 1st bucket (a bucket is a storage space in the cloud)
- The upload event automatically triggers the 1st function
- The function sends a request to the Video Intelligence API to detect the shots
- The Video Intelligence API analyzes the video and uploads the results (annotations) to the 2nd bucket
- The upload event triggers the 2nd function
- The function downloads both annotation and video files
- The function renders and uploads the summary to the 3rd bucket
- The video summary is ready!
Open source client libraries let you interface with Google Cloud services in idiomatic Python. You'll use the following:
Cloud Storage
- To manage downloads and uploads
- https://pypi.org/project/google-cloud-storage
Video Intelligence API
- To analyze videos
- https://pypi.org/project/google-cloud-videointelligence
Here is a choice of 2 additional Python libraries for the graphical needs:
OpenCV
- To extract video frames
- There's even a headless version (without GUI features), which is ideal for a service
- https://pypi.org/project/opencv-python-headless
Pillow
- To generate the visual summaries
Pillow
is a very popular imaging library, both extensive and easy to use- https://pypi.org/project/Pillow
Assuming you have a Google Cloud account, you can set up the architecture from Cloud Shell with the gcloud
and gsutil
commands. This lets you script everything from scratch in a reproducible way.
# Project
PROJECT_NAME="Visual Summary"
PROJECT_ID="visual-summary-REPLACE_WITH_UNIQUE_SUFFIX"
# Cloud Storage region (https://cloud.google.com/storage/docs/locations)
GCS_REGION="europe-west1"
# Cloud Functions region (https://cloud.google.com/functions/docs/locations)
GCF_REGION="europe-west1"
# Source
GIT_REPO="cherry-on-py"
PROJECT_SRC=~/$PROJECT_ID/$GIT_REPO/gcf_video_summary
# Cloud Storage buckets (environment variables)
export VIDEO_BUCKET="b1-videos_${PROJECT_ID}"
export ANNOTATION_BUCKET="b2-annotations_${PROJECT_ID}"
export SUMMARY_BUCKET="b3-summaries_${PROJECT_ID}"
Note: You can use your GitHub username as a unique suffix.
gcloud projects create $PROJECT_ID \
--name="$PROJECT_NAME" \
--set-as-default
Create in progress for [https://cloudresourcemanager.googleapis.com/v1/projects/PROJECT_ID].
Waiting for [operations/cp...] to finish...done.
Enabling service [cloudapis.googleapis.com] on project [PROJECT_ID]...
Operation "operations/acf..." finished successfully.
Updated property [core/project] to [PROJECT_ID].
# Link project with billing account (single account)
BILLING_ACCOUNT=$(gcloud beta billing accounts list \
--format 'value(name)')
# Link project with billing account (specific one among multiple accounts)
BILLING_ACCOUNT=$(gcloud beta billing accounts list \
--format 'value(name)' \
--filter "displayName='My Billing Account'")
gcloud beta billing projects link $PROJECT_ID --billing-account $BILLING_ACCOUNT
billingAccountName: billingAccounts/XXXXXX-YYYYYY-ZZZZZZ
billingEnabled: true
name: projects/PROJECT_ID/billingInfo
projectId: PROJECT_ID
# Create buckets with uniform bucket-level access
gsutil mb -b on -c regional -l $GCS_REGION gs://$VIDEO_BUCKET
gsutil mb -b on -c regional -l $GCS_REGION gs://$ANNOTATION_BUCKET
gsutil mb -b on -c regional -l $GCS_REGION gs://$SUMMARY_BUCKET
Creating gs://VIDEO_BUCKET/...
Creating gs://ANNOTATION_BUCKET/...
Creating gs://SUMMARY_BUCKET/...
You can check how it looks like in the Cloud Console:
Create a service account. This is for development purposes only (not needed for production). This provides you with credentials to run your code locally.
mkdir ~/$PROJECT_ID
cd ~/$PROJECT_ID
SERVICE_ACCOUNT_NAME="dev-service-account"
SERVICE_ACCOUNT="${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
gcloud iam service-accounts keys create ~/$PROJECT_ID/key.json --iam-account $SERVICE_ACCOUNT
Created service account [SERVICE_ACCOUNT_NAME].
created key [...] of type [json] as [~/PROJECT_ID/key.json] for [SERVICE_ACCOUNT]
Set the GOOGLE_APPLICATION_CREDENTIALS
environment variable and check that it points to the service account key. When you run the application code in the current shell session, client libraries will use these credentials for authentication. If you open a new shell session, set the variable again.
export GOOGLE_APPLICATION_CREDENTIALS=~/$PROJECT_ID/key.json
cat $GOOGLE_APPLICATION_CREDENTIALS
{
"type": "service_account",
"project_id": "PROJECT_ID",
"private_key_id": "...",
"private_key": "-----BEGIN PRIVATE KEY-----\n...",
"client_email": "SERVICE_ACCOUNT",
...
}
Authorize the service account to access the buckets:
IAM_BINDING="serviceAccount:${SERVICE_ACCOUNT}:roles/storage.objectAdmin"
gsutil iam ch $IAM_BINDING gs://$VIDEO_BUCKET
gsutil iam ch $IAM_BINDING gs://$ANNOTATION_BUCKET
gsutil iam ch $IAM_BINDING gs://$SUMMARY_BUCKET
A few APIs are enabled by default:
gcloud services list
NAME TITLE
bigquery.googleapis.com BigQuery API
bigquerystorage.googleapis.com BigQuery Storage API
cloudapis.googleapis.com Google Cloud APIs
clouddebugger.googleapis.com Cloud Debugger API
cloudtrace.googleapis.com Cloud Trace API
datastore.googleapis.com Cloud Datastore API
logging.googleapis.com Cloud Logging API
monitoring.googleapis.com Cloud Monitoring API
servicemanagement.googleapis.com Service Management API
serviceusage.googleapis.com Service Usage API
sql-component.googleapis.com Cloud SQL
storage-api.googleapis.com Google Cloud Storage JSON API
storage-component.googleapis.com Cloud Storage
Enable the Video Intelligence, Cloud Functions, and Cloud Build APIs:
gcloud services enable \
videointelligence.googleapis.com \
cloudfunctions.googleapis.com \
cloudbuild.googleapis.com
Operation "operations/acf..." finished successfully.
Note: Cloud Build generates container images for Cloud Functions upon deployment.
Retrieve the source code:
cd ~/$PROJECT_ID
git clone https://github.com/PicardParis/$GIT_REPO.git
Cloning into 'GIT_REPO'...
...
The Video Intelligence API is a pre-trained machine learning model that can analyze videos. One of the multiple features is video shot detection. For the 1st Cloud Function, here is a possible core function calling annotate_video()
with the SHOT_CHANGE_DETECTION
feature:
from google.cloud import storage, videointelligence
def launch_shot_detection(video_uri: str, annot_bucket: str):
"""Detect video shots (asynchronous operation)
Results will be stored in <annot_uri> with this naming convention:
- video_uri: gs://video_bucket/path/to/video.ext
- annot_uri: gs://annot_bucket/video_bucket/path/to/video.ext.json
"""
print(f"Launching shot detection for <{video_uri}>...")
features = [videointelligence.Feature.SHOT_CHANGE_DETECTION]
video_blob = storage.Blob.from_string(video_uri)
video_bucket = video_blob.bucket.name
path_to_video = video_blob.name
annot_uri = f"gs://{annot_bucket}/{video_bucket}/{path_to_video}.json"
request = dict(features=features, input_uri=video_uri, output_uri=annot_uri)
video_client = videointelligence.VideoIntelligenceServiceClient()
video_client.annotate_video(request)
Before deploying the function, you need to develop and test it. Create a Python 3 virtual environment and activate it:
cd ~/$PROJECT_ID
python3 -m venv venv
source venv/bin/activate
Install the dependencies:
pip install -r $PROJECT_SRC/gcf1_detect_shots/requirements.txt
Check the dependencies:
pip list
Package Version
------------------------------ ----------
...
google-cloud-storage 1.42.3
google-cloud-videointelligence 2.3.3
...
You can use the main scope to test the function in script mode:
import os
ANNOTATION_BUCKET = os.getenv("ANNOTATION_BUCKET", "")
assert ANNOTATION_BUCKET, "Undefined ANNOTATION_BUCKET environment variable"
if __name__ == "__main__":
# Local tests only (service account needed)
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"video_uri", type=str, help="gs://video_bucket/path/to/video.ext"
)
args = parser.parse_args()
launch_shot_detection(args.video_uri, ANNOTATION_BUCKET)
Note: You have already exported the
ANNOTATION_BUCKET
environment variable earlier in the shell session; you will also use it later at deployment stage. This makes the code generic and lets you reuse it independently of the output bucket.
Test the function:
VIDEO_PATH="cloud-samples-data/video/gbikes_dinosaur.mp4"
VIDEO_URI="gs://$VIDEO_PATH"
python $PROJECT_SRC/gcf1_detect_shots/main.py $VIDEO_URI
Launching shot detection for <gs://cloud-samples-data/video/gbikes_dinosaur.mp4>...
Note: The test video
<gbikes_dinosaur.mp4>
is located in an external bucket. This works because the video is publicly accessible.
Wait a moment and check that the annotations have been generated:
gsutil ls -r gs://$ANNOTATION_BUCKET
964 YYYY-MM-DDThh:mm:ssZ gs://ANNOTATION_BUCKET/VIDEO_PATH.json
TOTAL: 1 objects, 964 bytes (964 B)
Check the last 200 bytes of the annotation file:
gsutil cat -r -200 gs://$ANNOTATION_BUCKET/$VIDEO_PATH.json
}
}, {
"start_time_offset": {
"seconds": 28,
"nanos": 166666000
},
"end_time_offset": {
"seconds": 42,
"nanos": 766666000
}
} ]
} ]
}
Note: Those are the start and end positions of the last video shot. Everything seems fine.
Clean up when you're finished:
gsutil rm gs://$ANNOTATION_BUCKET/$VIDEO_PATH.json
deactivate
rm -rf venv
def gcf_detect_shots(data, context):
"""Cloud Function triggered by a new Cloud Storage object"""
video_bucket = data["bucket"]
path_to_video = data["name"]
video_uri = f"gs://{video_bucket}/{path_to_video}"
launch_shot_detection(video_uri, ANNOTATION_BUCKET)
Note: This function will be called whenever a video is uploaded to the bucket defined as a trigger.
Deploy the 1st function:
GCF_NAME="gcf1_detect_shots"
GCF_SOURCE="$PROJECT_SRC/gcf1_detect_shots"
GCF_ENTRY_POINT="gcf_detect_shots"
GCF_TRIGGER_BUCKET="$VIDEO_BUCKET"
GCF_ENV_VARS="ANNOTATION_BUCKET=$ANNOTATION_BUCKET"
GCF_MEMORY="128MB"
gcloud functions deploy $GCF_NAME \
--runtime python39 \
--source $GCF_SOURCE \
--entry-point $GCF_ENTRY_POINT \
--update-env-vars $GCF_ENV_VARS \
--trigger-bucket $GCF_TRIGGER_BUCKET \
--region $GCF_REGION \
--memory $GCF_MEMORY \
--quiet
Note: The default memory allocated for a Cloud Function is 256 MB (possible values are 128MB, 256MB, 512MB, 1024MB, and 2048MB). As the function has no memory or CPU needs (it sends a simple API request), the minimum memory setting is enough.
Deploying function (may take a while - up to 2 minutes)...done.
availableMemoryMb: 128
entryPoint: gcf_detect_shots
environmentVariables:
ANNOTATION_BUCKET: b2-annotations...
eventTrigger:
eventType: google.storage.object.finalize
...
status: ACTIVE
timeout: 60s
updateTime: 'YYYY-MM-DDThh:mm:ss.mmmZ'
versionId: '1'
Note: The
ANNOTATION_BUCKET
environment variable is defined with the--update-env-vars
flag. Using an environment variable lets you deploy the exact same code with different trigger and output buckets.
Here is how it looks like in the Cloud Console:
Make sure to test the function in production. Copy a video into the video bucket:
VIDEO_NAME="gbikes_dinosaur.mp4"
SRC_URI="gs://cloud-samples-data/video/$VIDEO_NAME"
DST_URI="gs://$VIDEO_BUCKET/$VIDEO_NAME"
gsutil cp $SRC_URI $DST_URI
Copying gs://cloud-samples-data/video/gbikes_dinosaur.mp4 [Content-Type=video/mp4]...
- [1 files][ 62.0 MiB/ 62.0 MiB]
Operation completed over 1 objects/62.0 MiB.
Query the logs to check that the function has been triggered:
gcloud functions logs read --region $GCF_REGION
LEVEL NAME EXECUTION_ID TIME_UTC LOG
D gcf1_detect_shots ... ... Function execution started
I gcf1_detect_shots ... ... Launching shot detection for <gs://VIDEO_BUCKET/VIDEO_NAME>...
D gcf1_detect_shots ... ... Function execution took 874 ms, finished with status: 'ok'
Wait a moment and check the annotation bucket:
gsutil ls -r gs://$ANNOTATION_BUCKET
You should see the annotation file:
gs://ANNOTATION_BUCKET/VIDEO_BUCKET/:
gs://ANNOTATION_BUCKET/VIDEO_BUCKET/VIDEO_NAME.json
The 1st function is operational!
It's interesting to split the code into 2 main classes:
StorageHelper
for local file and cloud storage object managementVideoProcessor
for graphical processings
Here is a possible core function:
class VideoProcessor:
@staticmethod
def generate_summary(annot_uri: str, output_bucket: str):
""" Generate a video summary from video shot annotations """
try:
with StorageHelper(annot_uri, output_bucket) as storage:
with VideoProcessor(storage) as video_proc:
print("Generating summary...")
image = video_proc.render_summary()
video_proc.upload_summary_as_jpeg(image)
except Exception:
logging.exception("Could not generate summary from <%s>", annot_uri)
Note: If exceptions are raised, it's handy to log them with
logging.exception()
to get a stack trace in production logs.
The class manages the following:
- The retrieval and parsing of video shot annotations
- The download of source videos
- The upload of generated visual summaries
- File names
class StorageHelper:
"""Local+Cloud storage helper
- Uses a temp dir for local processing (e.g. video frame extraction)
- Paths are relative to this temp dir (named after the output bucket)
Naming convention:
- video_uri: gs://video_bucket/path/to/video.ext
- annot_uri: gs://annot_bucket/video_bucket/path/to/video.ext.json
- video_path: video_bucket/path/to/video.ext
- summary_path: video_bucket/path/to/video.ext.SUFFIX
- summary_uri: gs://output_bucket/video_bucket/path/to/video.ext.SUFFIX
"""
client = storage.Client()
video_shots: list[VideoShot]
video_path: Path
video_local_path: Path
upload_bucket: storage.Bucket
def __init__(self, annot_uri: str, output_bucket: str):
if not annot_uri.endswith(ANNOT_EXT):
raise RuntimeError(f"annot_uri must end with <{ANNOT_EXT}>")
self.video_shots = self.get_video_shots(annot_uri)
self.video_path = self.video_path_from_uri(annot_uri)
temp_root = Path(tempfile.gettempdir(), output_bucket)
temp_root.mkdir(parents=True, exist_ok=True)
self.video_local_path = temp_root.joinpath(self.video_path)
self.upload_bucket = self.client.bucket(output_bucket)
The source video is handled in the with
statement context manager:
def __enter__(self):
self.download_video()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.video_local_path.unlink()
Note: Once downloaded, the video uses memory space in the
/tmp
RAM disk (the only writable space for the serverless function). It's best to delete temporary files when they're not needed anymore, to avoid potential out-of-memory errors on future invocations of the function.
The video annotations can be retrieved with the methods storage.Blob.download_as_text()
and json.loads()
:
def get_video_shots(self, annot_uri: str) -> list[VideoShot]:
json_blob = storage.Blob.from_string(annot_uri, self.client)
api_response: dict = json.loads(json_blob.download_as_text())
single_video_results: dict = api_response["annotation_results"][0]
annotations: list = single_video_results["shot_annotations"]
return [VideoShot.from_dict(annotation) for annotation in annotations]
The parsing is handled with this VideoShot
helper class:
class VideoShot(NamedTuple):
"""Video shot start/end positions in nanoseconds"""
pos1_ns: int
pos2_ns: int
NANOS_PER_SECOND = 10 ** 9
@classmethod
def from_dict(cls, annotation: dict) -> "VideoShot":
def time_offset_in_ns(time_offset) -> int:
seconds: int = time_offset.get("seconds", 0)
nanos: int = time_offset.get("nanos", 0)
return seconds * cls.NANOS_PER_SECOND + nanos
pos1_ns = time_offset_in_ns(annotation["start_time_offset"])
pos2_ns = time_offset_in_ns(annotation["end_time_offset"])
return cls(pos1_ns, pos2_ns)
The naming convention was chosen to keep consistent object paths between the different buckets. This also lets you deduce the video path from the annotation URI:
def video_path_from_uri(self, annot_uri: str) -> Path:
annot_blob = storage.Blob.from_string(annot_uri)
return Path(annot_blob.name[: -len(ANNOT_EXT)])
The video is directly downloaded with storage.Blob.download_to_filename()
:
def download_video(self):
video_uri = f"gs://{self.video_path.as_posix()}"
blob = storage.Blob.from_string(video_uri, self.client)
print(f"Downloading -> {self.video_local_path}")
self.video_local_path.parent.mkdir(parents=True, exist_ok=True)
blob.download_to_filename(self.video_local_path)
On the opposite, results can be uploaded with storage.Blob.upload_from_string()
:
def upload_summary(self, image_bytes: bytes, image_type: str):
path = self.summary_path(image_type)
blob = self.upload_bucket.blob(path.as_posix())
content_type = f"image/{image_type}"
print(f"Uploading -> {blob.name}")
blob.upload_from_string(image_bytes, content_type)
Note:
Pillow
supports working with memory images, which avoids having to manage local files.
And finally, here is a possible naming convention for the summary files:
def summary_path(self, image_type: str) -> Path:
video_name = self.video_path.name
shot_count = len(self.video_shots)
suffix = f"summary{shot_count:03d}.{image_type}"
summary_name = f"{video_name}.{suffix}"
return Path(self.video_path.parent, summary_name)
The class manages the following:
- Video frame extraction
- Visual summary generation
import cv2 as cv
from PIL import Image
from storage_helper import StorageHelper
PilImage = Image.Image
ImageSize = NamedTuple("ImageSize", [("w", int), ("h", int)])
class VideoProcessor:
storage: StorageHelper
video: cv.VideoCapture
cell_size: ImageSize
grid_size: ImageSize
def __init__(self, storage: StorageHelper):
self.storage = storage
Opening and closing the video is handled in the with
statement context manager:
def __enter__(self):
video_path = self.storage.video_local_path
self.video = cv.VideoCapture(str(video_path))
if not self.video.isOpened():
raise RuntimeError(f"Could not open video <{video_path}>")
self.compute_grid_dimensions()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.video.release()
The video summary is a grid of cells which can be rendered in a single loop with two generators:
def render_summary(self, shot_ratio: float = 0.5) -> PilImage:
grid_img = Image.new("RGB", self.grid_size, RGB_BACKGROUND)
img_and_pos_iter = zip(self.gen_cell_img(shot_ratio), self.gen_cell_pos())
for cell_img, cell_pos in img_and_pos_iter:
cell_img.thumbnail(self.cell_size) # Makes it smaller if needed
grid_img.paste(cell_img, cell_pos)
return grid_img
Note:
shot_ratio
is set to0.5
by default to extract video shot middle frames.
The first generator yields cell images:
def gen_cell_img(self, shot_ratio: float) -> Iterator[PilImage]:
assert 0.0 <= shot_ratio <= 1.0
MS_IN_NS = 10 ** 6
for video_shot in self.storage.video_shots:
pos1_ns, pos2_ns = video_shot
pos_ms = (pos1_ns + shot_ratio * (pos2_ns - pos1_ns)) / MS_IN_NS
yield self.frame_at_position(pos_ms)
The second generator yields cell positions:
def gen_cell_pos(self) -> Iterator[tuple[int, int]]:
cell_x, cell_y = 0, 0
while True:
yield cell_x, cell_y
cell_x += self.cell_size.w
if self.grid_size.w <= cell_x: # Move to next row?
cell_x, cell_y = 0, cell_y + self.cell_size.h
OpenCV
easily allows extracting video frames at a given position:
def frame_at_position(self, pos_ms: float) -> PilImage:
self.video.set(cv.CAP_PROP_POS_MSEC, pos_ms)
_, cv_frame = self.video.read()
return Image.fromarray(cv.cvtColor(cv_frame, cv.COLOR_BGR2RGB))
Choosing the summary grid composition is arbitrary. Here is an example to compose a summary preserving the video proportions:
def compute_grid_dimensions(self):
shot_count = len(self.storage.video_shots)
if shot_count < 1:
raise RuntimeError(f"Expected 1+ video shots (got {shot_count})")
# Try to preserve the video aspect ratio
# Consider cells as pixels and try to fit them in a square
cols = rows = int(shot_count ** 0.5 + 0.5)
if cols * rows < shot_count:
cols += 1
cell_w = int(self.video.get(cv.CAP_PROP_FRAME_WIDTH))
cell_h = int(self.video.get(cv.CAP_PROP_FRAME_HEIGHT))
if SUMMARY_MAX_SIZE.w < cell_w * cols:
scale = SUMMARY_MAX_SIZE.w / (cell_w * cols)
cell_w = int(scale * cell_w)
cell_h = int(scale * cell_h)
self.cell_size = ImageSize(cell_w, cell_h)
self.grid_size = ImageSize(cell_w * cols, cell_h * rows)
Finally, Pillow
gives full control on image serializations:
def upload_summary_as_jpeg(self, image: PilImage):
mem_file = BytesIO()
image_type = "jpeg"
jpeg_save_parameters = dict(optimize=True, progressive=True)
image.save(mem_file, format=image_type, **jpeg_save_parameters)
image_bytes = mem_file.getvalue()
self.storage.upload_summary(image_bytes, image_type)
Note: Working with in-memory images avoids managing local files and uses less memory.
You can use the main scope to test the function in script mode:
import os
from video_processor import VideoProcessor
SUMMARY_BUCKET = os.getenv("SUMMARY_BUCKET", "")
assert SUMMARY_BUCKET, "Undefined SUMMARY_BUCKET environment variable"
if __name__ == "__main__":
# Local tests only (service account needed)
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"annot_uri", type=str, help="gs://annotation_bucket/path/to/video.ext.json"
)
args = parser.parse_args()
VideoProcessor.generate_summary(args.annot_uri, SUMMARY_BUCKET)
Test the function:
cd ~/$PROJECT_ID
python3 -m venv venv
source venv/bin/activate
pip install -r $PROJECT_SRC/gcf2_generate_summary/requirements.txt
VIDEO_NAME="gbikes_dinosaur.mp4"
ANNOTATION_URI="gs://$ANNOTATION_BUCKET/$VIDEO_BUCKET/$VIDEO_NAME.json"
python $PROJECT_SRC/gcf2_generate_summary/main.py $ANNOTATION_URI
Downloading -> /tmp/SUMMARY_BUCKET/VIDEO_BUCKET/VIDEO_NAME
Generating summary...
Uploading -> VIDEO_BUCKET/VIDEO_NAME.summary004.jpeg
Note: The uploaded video summary shows 4 shots.
Clean up:
deactivate
rm -rf venv
def gcf_generate_summary(data, context):
"""Cloud Function triggered by a new Cloud Storage object"""
annotation_bucket = data["bucket"]
path_to_annotation = data["name"]
annot_uri = f"gs://{annotation_bucket}/{path_to_annotation}"
VideoProcessor.generate_summary(annot_uri, SUMMARY_BUCKET)
Note: This function will be called whenever an annotation file is uploaded to the bucket defined as a trigger.
GCF_NAME="gcf2_generate_summary"
GCF_SOURCE="$PROJECT_SRC/gcf2_generate_summary"
GCF_ENTRY_POINT="gcf_generate_summary"
GCF_TRIGGER_BUCKET="$ANNOTATION_BUCKET"
GCF_ENV_VARS="SUMMARY_BUCKET=$SUMMARY_BUCKET"
GCF_TIMEOUT="540s"
GCF_MEMORY="512MB"
gcloud functions deploy $GCF_NAME \
--runtime python39 \
--source $GCF_SOURCE \
--entry-point $GCF_ENTRY_POINT \
--update-env-vars $GCF_ENV_VARS \
--trigger-bucket $GCF_TRIGGER_BUCKET \
--region $GCF_REGION \
--timeout $GCF_TIMEOUT \
--memory $GCF_MEMORY \
--quiet
Notes:
- The default timeout for a Cloud Function is 60 seconds. As you're deploying a background function with potentially long processings, set it to the maximum value (540 seconds = 9 minutes).
- You also need to bump up the memory a little for the video and image processings. Depending on the size of your videos and the maximum resolution of your output summaries, or if you need to generate the summary faster (memory size and vCPU speed are correlated), you might use a higher value (1024MB or 2048MB).
Deploying function (may take a while - up to 2 minutes)...done.
availableMemoryMb: 512
entryPoint: gcf_generate_summary
environmentVariables:
SUMMARY_BUCKET: b3-summaries...
...
status: ACTIVE
timeout: 540s
updateTime: 'YYYY-MM-DDThh:mm:ss.mmmZ'
versionId: '1'
Here is how it looks like in the Cloud Console:
Make sure to test the function in production. You can upload an annotation file in the 2nd bucket:
VIDEO_NAME="gbikes_dinosaur.mp4"
ANNOTATION_FILE="$VIDEO_NAME.json"
ANNOTATION_URI="gs://$ANNOTATION_BUCKET/$VIDEO_BUCKET/$ANNOTATION_FILE"
gsutil cp $ANNOTATION_URI .
gsutil cp $ANNOTATION_FILE $ANNOTATION_URI
rm $ANNOTATION_FILE
Note: This reuses the previous local test annotation file and overwrites it. Overwriting a file in a bucket also triggers attached functions.
Wait a few seconds and query the logs to check that the function has been triggered:
gcloud functions logs read --region $GCF_REGION
LEVEL NAME EXECUTION_ID TIME_UTC LOG
...
D gcf2_generate_summary ... ... Function execution started
I gcf2_generate_summary ... ... Downloading -> /tmp/SUMMARY_BUCKET/VIDEO_BUCKET/VIDEO_NAME
I gcf2_generate_summary ... ... Generating summary...
I gcf2_generate_summary ... ... Uploading -> VIDEO_BUCKET/VIDEO_NAME.summary004.jpeg
D gcf2_generate_summary ... ... Function execution took 11591 ms, finished with status: 'ok'
The 2nd function is operational and the pipeline is in place! You can now do end-to-end tests by copying new videos in the 1st bucket.
Download the generated summary on your computer:
cd ~/$PROJECT_ID
gsutil cp -r gs://$SUMMARY_BUCKET/**.jpeg .
cloudshell download *.jpeg
Here is the visual summary for gbikes_dinosaur.mp4
(4 detected shots):
You can also directly preview the file from the Cloud Console:
Now, the icing on the cake (or the "cherry on the pie" as we say in French)...
- Based on the same architecture and code, you can add a few features:
- Trigger the processing for videos from other buckets
- Generate summaries in multiple formats (such as JPEG, PNG, WEBP)
- Generate animated summaries (also in multiple formats, such as GIF, PNG, WEBP)
- Enrich the architecture to duplicate 2 items:
- The video shot detection function, to get it to run as an HTTP endpoint
- The summary generation function to handle animated images
- Adapt the code to support the new features:
- An
animated
parameter to generate still or animated summaries - Save and upload the results in multiple formats
- An
- A. Video shot detection can also be triggered manually with an HTTP GET request
- B. Still and animated summaries are generated in 2 functions in parallel
- C. Summaries are uploaded in multiple image formats
def gcf_detect_shots_http(request):
"""Cloud Function triggered by an HTTP GET request"""
if request.method != "GET":
return ("Please use a GET request", 403)
if not request.args or "video_uri" not in request.args:
return ('Please specify a "video_uri" parameter', 400)
video_uri = request.args["video_uri"]
launch_shot_detection(video_uri, ANNOTATION_BUCKET)
return f"Launched shot detection for <{video_uri}>"
Note: This is the same code as
gcf_detect_shots
with the video URI parameter provided from a GET request.
GCF_NAME="gcf1_detect_shots_http"
GCF_SOURCE="$PROJECT_SRC/gcf1_detect_shots"
GCF_ENTRY_POINT="gcf_detect_shots_http"
GCF_TRIGGER_BUCKET="$VIDEO_BUCKET"
GCF_ENV_VARS="ANNOTATION_BUCKET=$ANNOTATION_BUCKET"
GCF_MEMORY="128MB"
gcloud functions deploy $GCF_NAME \
--runtime python39 \
--source $GCF_SOURCE \
--entry-point $GCF_ENTRY_POINT \
--update-env-vars $GCF_ENV_VARS \
--trigger-http \
--region $GCF_REGION \
--memory $GCF_MEMORY \
--quiet
Here is how it looks like in the Cloud Console:
Add an animated
option in the core function:
class VideoProcessor:
@staticmethod
- def generate_summary(annot_uri: str, output_bucket: str):
+ def generate_summary(annot_uri: str, output_bucket: str, animated=False):
""" Generate a video summary from video shot annotations """
try:
with StorageHelper(annot_uri, output_bucket) as storage:
with VideoProcessor(storage) as video_proc:
print("Generating summary...")
- image = video_proc.render_summary()
- video_proc.upload_summary_as_jpeg(image)
+ if animated:
+ video_proc.generate_summary_animations()
+ else:
+ video_proc.generate_summary_stills()
except Exception:
logging.exception("Could not generate summary from <%s>", annot_uri)
Define the formats you're interested in generating:
ImageFormat = NamedTuple("ImageFormat", [("type", str), ("save_parameters", dict)])
IMAGE_JPEG = ImageFormat("jpeg", dict(optimize=True, progressive=True))
IMAGE_GIF = ImageFormat("gif", dict(optimize=True))
IMAGE_PNG = ImageFormat("png", dict(optimize=True))
IMAGE_WEBP = ImageFormat("webp", dict(lossless=False, quality=80, method=1))
SUMMARY_STILL_FORMATS = (IMAGE_JPEG, IMAGE_PNG, IMAGE_WEBP)
SUMMARY_ANIMATED_FORMATS = (IMAGE_GIF, IMAGE_PNG, IMAGE_WEBP)
Add support to generate still and animated summaries in different formats:
def generate_summary_stills(self):
image = self.render_summary()
for image_format in SUMMARY_STILL_FORMATS:
self.upload_summary([image], image_format)
def generate_summary_animations(self):
frame_count = ANIMATION_FRAMES
images = []
for frame_index in range(frame_count):
shot_ratio = (frame_index + 1) / (frame_count + 1)
print(f"shot_ratio: {shot_ratio:.0%}")
image = self.render_summary(shot_ratio)
images.append(image)
for image_format in SUMMARY_ANIMATED_FORMATS:
self.upload_summary(images, image_format)
The serialization can still take place in a single function:
def upload_summary(self, images: list[PilImage], image_format: ImageFormat):
if not images:
raise RuntimeError("Empty image list")
mem_file = BytesIO()
image_type = image_format.type
save_parameters = image_format.save_parameters.copy()
if animated := 1 < len(images):
save_parameters |= dict(
save_all=True,
append_images=images[1:],
duration=ANIMATION_FRAME_DURATION_MS,
loop=0, # Infinite loop
)
images[0].save(mem_file, format=image_type, **save_parameters)
image_bytes = mem_file.getvalue()
self.storage.upload_summary(image_bytes, image_type, animated)
Note:
Pillow
is both versatile and consistent, allowing for significant and clean code factorization.
Add an animated
optional parameter to the StorageHelper
class:
class StorageHelper:
- def upload_summary(self, image_bytes: bytes, image_type: str):
- path = self.summary_path(image_type)
+ def upload_summary(self, image_bytes: bytes, image_type: str, animated=False):
+ path = self.summary_path(image_type, animated)
blob = self.upload_bucket.blob(path.as_posix())
content_type = f"image/{image_type}"
print(f"Uploading -> {blob.name}")
blob.upload_from_string(image_bytes, content_type)
- def summary_path(self, image_type: str) -> Path:
+ def summary_path(self, image_type: str, animated=False) -> Path:
video_name = self.video_path.name
shot_count = self.shot_count()
- suffix = f"summary{shot_count:03d}.{image_type}"
+ still_or_anim = "anim" if animated else "still"
+ suffix = f"summary{shot_count:03d}_{still_or_anim}.{image_type}"
summary_name = f'{video_name}.{suffix}'
return Path(self.video_path.parent, summary_name)
And finally, add an ANIMATED
optional environment variable in the entry point:
...
+ANIMATED = os.getenv("ANIMATED", "0") == "1"
def gcf_generate_summary(data, context):
...
- VideoProcessor.generate_summary(annot_uri, SUMMARY_BUCKET)
+ VideoProcessor.generate_summary(annot_uri, SUMMARY_BUCKET, ANIMATED)
if __name__ == '__main__':
...
- VideoProcessor.generate_summary(args.annot_uri, SUMMARY_BUCKET)
+ VideoProcessor.generate_summary(args.annot_uri, SUMMARY_BUCKET, ANIMATED)
Duplicate the 2nd function with the additional ANIMATED
environment variable:
GCF_NAME="gcf2_generate_summary_animated"
GCF_SOURCE="$PROJECT_SRC/gcf2_generate_summary"
GCF_ENTRY_POINT="gcf_generate_summary"
GCF_TRIGGER_BUCKET="$ANNOTATION_BUCKET"
GCF_ENV_VARS1="SUMMARY_BUCKET=$SUMMARY_BUCKET"
GCF_ENV_VARS2="ANIMATED=1"
GCF_TIMEOUT="540s"
GCF_MEMORY="2048MB"
gcloud functions deploy $GCF_NAME \
--runtime python39 \
--source $GCF_SOURCE \
--entry-point $GCF_ENTRY_POINT \
--update-env-vars $GCF_ENV_VARS1 \
--update-env-vars $GCF_ENV_VARS2 \
--trigger-bucket $GCF_TRIGGER_BUCKET \
--region $GCF_REGION \
--timeout $GCF_TIMEOUT \
--memory $GCF_MEMORY \
--quiet
Here is how it looks like in the Cloud Console:
The HTTP endpoint lets you trigger the pipeline with a GET request:
GCF_NAME="gcf1_detect_shots_http"
VIDEO_URI="gs://cloud-samples-data/video/visionapi.mp4"
GCF_URL="https://$GCF_REGION-$PROJECT_ID.cloudfunctions.net/$GCF_NAME?video_uri=$VIDEO_URI"
curl $GCF_URL -H "Authorization: bearer $(gcloud auth print-identity-token)"
Launched shot detection for <VIDEO_URI>
Note: The test video
<visionapi.mp4>
is located in an external bucket but is publicly accessible.
In addition, copy one or several videos into the video bucket. You can drag and drop videos:
The videos are then processed in parallel. Here are a few logs:
LEVEL NAME EXECUTION_ID ... LOG
...
D gcf2_generate_summary_animated f6n6tslsfwdu ... Function execution took 49293 ms, finished with status: 'ok'
I gcf2_generate_summary yd1vqabafn17 ... Uploading -> b1-videos.../JaneGoodall.mp4.summary035_still.png
I gcf2_generate_summary_animated qv9b03814jjk ... shot_ratio: 43%
I gcf2_generate_summary yd1vqabafn17 ... Uploading -> b1-videos.../JaneGoodall.mp4.summary035_still.webp
D gcf2_generate_summary yd1vqabafn17 ... Function execution took 54616 ms, finished with status: 'ok'
I gcf2_generate_summary_animated g4d2wrzxz2st ... shot_ratio: 71%
...
D gcf2_generate_summary amwmov1wk0gn ... Function execution took 65256 ms, finished with status: 'ok'
I gcf2_generate_summary_animated 7pp882fz0x84 ... shot_ratio: 57%
I gcf2_generate_summary_animated i3u830hsjz4r ... Uploading -> b1-videos.../JaneGoodall.mp4.summary035_anim.png
I gcf2_generate_summary_animated i3u830hsjz4r ... Uploading -> b1-videos.../JaneGoodall.mp4.summary035_anim.webp
D gcf2_generate_summary_animated i3u830hsjz4r ... Function execution took 70862 ms, finished with status: 'ok'
...
In the 3rd bucket, you'll find all still and animated summaries:
You've already seen the still summary for <JaneGoodall.mp4>
as an introduction to this tutorial. In the animated version, and in only 6 frames, you get an even better idea of what the whole video is about:
If you don't want to keep your project, you can delete it:
gcloud projects delete $PROJECT_ID
How big is the code base?
first_line_after_licence=16
find $PROJECT_SRC -name '*.py' -exec tail -n +$first_line_after_licence {} \; | grep -v "^$" | wc -l
Number of Python lines:
262
- Video analysis and processing, with different options, run in less than 300 lines of readable Python.
- Less lines, less bugs!
- 🔥🐍 Mission accomplished! 🐍🔥
I hope you appreciated this tutorial and would love to read your feedback. You can also follow me on Twitter.