Skip to content

Latest commit

 

History

History
455 lines (333 loc) · 18.6 KB

functions-bindings-storage-queue-trigger.md

File metadata and controls

455 lines (333 loc) · 18.6 KB
title description author ms.topic ms.date ms.author ms.custom
Azure Queue storage trigger for Azure Functions
Learn to run an Azure Function as Azure Queue storage data changes.
craigshoemaker
reference
02/18/2020
cshoe
devx-track-csharp, cc996988-fb4f-47, devx-track-python

Azure Queue storage trigger for Azure Functions

The queue storage trigger runs a function as messages are added to Azure Queue storage.

Encoding

Functions expect a base64 encoded string. Any adjustments to the encoding type (in order to prepare data as a base64 encoded string) need to be implemented in the calling service.

Example

Use the queue trigger to start a function when a new item is received on a queue. The queue message is provided as input to the function.

The following example shows a C# function that polls the myqueue-items queue and writes a log each time a queue item is processed.

public static class QueueFunctions
{
    [FunctionName("QueueTrigger")]
    public static void QueueTrigger(
        [QueueTrigger("myqueue-items")] string myQueueItem, 
        ILogger log)
    {
        log.LogInformation($"C# function processed: {myQueueItem}");
    }
}

The following example shows a queue trigger binding in a function.json file and C# script (.csx) code that uses the binding. The function polls the myqueue-items queue and writes a log each time a queue item is processed.

Here's the function.json file:

{
    "disabled": false,
    "bindings": [
        {
            "type": "queueTrigger",
            "direction": "in",
            "name": "myQueueItem",
            "queueName": "myqueue-items",
            "connection":"MyStorageConnectionAppSetting"
        }
    ]
}

The configuration section explains these properties.

Here's the C# script code:

#r "Microsoft.WindowsAzure.Storage"

using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage.Queue;
using System;

public static void Run(CloudQueueMessage myQueueItem, 
    DateTimeOffset expirationTime, 
    DateTimeOffset insertionTime, 
    DateTimeOffset nextVisibleTime,
    string queueTrigger,
    string id,
    string popReceipt,
    int dequeueCount,
    ILogger log)
{
    log.LogInformation($"C# Queue trigger function processed: {myQueueItem.AsString}\n" +
        $"queueTrigger={queueTrigger}\n" +
        $"expirationTime={expirationTime}\n" +
        $"insertionTime={insertionTime}\n" +
        $"nextVisibleTime={nextVisibleTime}\n" +
        $"id={id}\n" +
        $"popReceipt={popReceipt}\n" + 
        $"dequeueCount={dequeueCount}");
}

The usage section explains myQueueItem, which is named by the name property in function.json. The message metadata section explains all of the other variables shown.

The following Java example shows a storage queue trigger function, which logs the triggered message placed into queue myqueuename.

@FunctionName("queueprocessor")
public void run(
   @QueueTrigger(name = "msg",
                  queueName = "myqueuename",
                  connection = "myconnvarname") String message,
    final ExecutionContext context
) {
    context.getLogger().info(message);
}

The following example shows a queue trigger binding in a function.json file and a JavaScript function that uses the binding. The function polls the myqueue-items queue and writes a log each time a queue item is processed.

Here's the function.json file:

{
    "disabled": false,
    "bindings": [
        {
            "type": "queueTrigger",
            "direction": "in",
            "name": "myQueueItem",
            "queueName": "myqueue-items",
            "connection":"MyStorageConnectionAppSetting"
        }
    ]
}

The configuration section explains these properties.

Note

The name parameter reflects as context.bindings.<name> in the JavaScript code which contains the queue item payload. This payload is also passed as the second parameter to the function.

Here's the JavaScript code:

module.exports = async function (context, message) {
    context.log('Node.js queue trigger function processed work item', message);
    // OR access using context.bindings.<name>
    // context.log('Node.js queue trigger function processed work item', context.bindings.myQueueItem);
    context.log('expirationTime =', context.bindingData.expirationTime);
    context.log('insertionTime =', context.bindingData.insertionTime);
    context.log('nextVisibleTime =', context.bindingData.nextVisibleTime);
    context.log('id =', context.bindingData.id);
    context.log('popReceipt =', context.bindingData.popReceipt);
    context.log('dequeueCount =', context.bindingData.dequeueCount);
    context.done();
};

The usage section explains myQueueItem, which is named by the name property in function.json. The message metadata section explains all of the other variables shown.

The following example demonstrates how to read a queue message passed to a function via a trigger.

A Storage queue trigger is defined in function.json file where type is set to queueTrigger.

{
  "bindings": [
    {
      "name": "QueueItem",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "messages",
      "connection": "MyStorageConnectionAppSetting"
    }
  ]
}

The code in the Run.ps1 file declares a parameter as $QueueItem, which allows you to read the queue message in your function.

# Input bindings are passed in via param block.
param([string] $QueueItem,$TriggerMetadata)

# Write out the queue message and metadata to the information log.
Write-Host"PowerShell queue trigger function processed work item: $QueueItem"
Write-Host"Queue item expiration time: $($TriggerMetadata.ExpirationTime)"
Write-Host"Queue item insertion time: $($TriggerMetadata.InsertionTime)"
Write-Host"Queue item next visible time: $($TriggerMetadata.NextVisibleTime)"
Write-Host"ID: $($TriggerMetadata.Id)"
Write-Host"Pop receipt: $($TriggerMetadata.PopReceipt)"
Write-Host"Dequeue count: $($TriggerMetadata.DequeueCount)"

The following example demonstrates how to read a queue message passed to a function via a trigger.

A Storage queue trigger is defined in function.json where type is set to queueTrigger.

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "msg",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "messages",
      "connection": "AzureStorageQueuesConnectionString"
    }
  ]
}

The code _init_.py declares a parameter as func.QueueMessage, which allows you to read the queue message in your function.

import logging
import json

import azure.functions as func

def main(msg: func.QueueMessage):
    logging.info('Python queue trigger function processed a queue item.')

    result = json.dumps({
        'id': msg.id,
        'body': msg.get_body().decode('utf-8'),
        'expiration_time': (msg.expiration_time.isoformat()
                            if msg.expiration_time else None),
        'insertion_time': (msg.insertion_time.isoformat()
                           if msg.insertion_time else None),
        'time_next_visible': (msg.time_next_visible.isoformat()
                              if msg.time_next_visible else None),
        'pop_receipt': msg.pop_receipt,
        'dequeue_count': msg.dequeue_count
    })

    logging.info(result)

Attributes and annotations

In C# class libraries, use the following attributes to configure a queue trigger:

  • QueueTriggerAttribute

    The attribute's constructor takes the name of the queue to monitor, as shown in the following example:

    [FunctionName("QueueTrigger")]
    public static void Run(
        [QueueTrigger("myqueue-items")] string myQueueItem, 
        ILogger log)
    {
        ...
    }

    You can set the Connection property to specify the app setting that contains the storage account connection string to use, as shown in the following example:

    [FunctionName("QueueTrigger")]
    public static void Run(
        [QueueTrigger("myqueue-items", Connection = "StorageConnectionAppSetting")] string myQueueItem, 
        ILogger log)
    {
        ....
    }

    For a complete example, see example.

  • StorageAccountAttribute

    Provides another way to specify the storage account to use. The constructor takes the name of an app setting that contains a storage connection string. The attribute can be applied at the parameter, method, or class level. The following example shows class level and method level:

    [StorageAccount("ClassLevelStorageAppSetting")]
    public static class AzureFunctions
    {
        [FunctionName("QueueTrigger")]
        [StorageAccount("FunctionLevelStorageAppSetting")]
        public static void Run( //...
    {
        ...
    }

The storage account to use is determined in the following order:

  • The QueueTrigger attribute's Connection property.
  • The StorageAccount attribute applied to the same parameter as the QueueTrigger attribute.
  • The StorageAccount attribute applied to the function.
  • The StorageAccount attribute applied to the class.
  • The "AzureWebJobsStorage" app setting.

Attributes are not supported by C# Script.

The QueueTrigger annotation gives you access to the queue that triggers the function. The following example makes the queue message available to the function via the message parameter.

package com.function;
import com.microsoft.azure.functions.annotation.*;
import java.util.Queue;
import com.microsoft.azure.functions.*;

public class QueueTriggerDemo {
    @FunctionName("QueueTriggerDemo")
    public void run(
        @QueueTrigger(name = "message", queueName = "messages", connection = "MyStorageConnectionAppSetting") String message,
        final ExecutionContext context
    ) {
        context.getLogger().info("Queue message: " + message);
    }
}
Property Description
name Declares the parameter name in the function signature. When the function is triggered, this parameter's value has the contents of the queue message.
queueName Declares the queue name in the storage account.
connection Points to the storage account connection string.

Attributes are not supported by JavaScript.

Attributes are not supported by PowerShell.

Attributes are not supported by Python.


Configuration

The following table explains the binding configuration properties that you set in the function.json file and the QueueTrigger attribute.

function.json property Attribute property Description
type n/a Must be set to queueTrigger. This property is set automatically when you create the trigger in the Azure portal.
direction n/a In the function.json file only. Must be set to in. This property is set automatically when you create the trigger in the Azure portal.
name n/a The name of the variable that contains the queue item payload in the function code.
queueName QueueName The name of the queue to poll.
connection Connection The name of an app setting that contains the Storage connection string to use for this binding. If the app setting name begins with "AzureWebJobs", you can specify only the remainder of the name here. For example, if you set connection to "MyStorage", the Functions runtime looks for an app setting that is named "MyStorage." If you leave connection empty, the Functions runtime uses the default Storage connection string in the app setting that is named AzureWebJobsStorage.

[!INCLUDE app settings to local.settings.json]

Usage

Access the message data by using a method parameter such as string paramName. You can bind to any of the following types:

  • Object - The Functions runtime deserializes a JSON payload into an instance of an arbitrary class defined in your code.
  • string
  • byte[]
  • CloudQueueMessage

If you try to bind to CloudQueueMessage and get an error message, make sure that you have a reference to the correct Storage SDK version.

Access the message data by using a method parameter such as string paramName. The paramName is the value specified in the name property of function.json. You can bind to any of the following types:

  • Object - The Functions runtime deserializes a JSON payload into an instance of an arbitrary class defined in your code.
  • string
  • byte[]
  • CloudQueueMessage

If you try to bind to CloudQueueMessage and get an error message, make sure that you have a reference to the correct Storage SDK version.

The QueueTrigger annotation gives you access to the queue message that triggered the function.

The queue item payload is available via context.bindings.<NAME> where <NAME> matches the name defined in function.json. If the payload is JSON, the value is deserialized into an object.

Access the queue message via string parameter that matches the name designated by binding's name parameter in the function.json file.

Access the queue message via the parameter typed as QueueMessage.


Message metadata

The queue trigger provides several metadata properties. These properties can be used as part of binding expressions in other bindings or as parameters in your code. The properties are members of the CloudQueueMessage class.

Property Type Description
QueueTrigger string Queue payload (if a valid string). If the queue message payload is a string, QueueTrigger has the same value as the variable named by the name property in function.json.
DequeueCount int The number of times this message has been dequeued.
ExpirationTime DateTimeOffset The time that the message expires.
Id string Queue message ID.
InsertionTime DateTimeOffset The time that the message was added to the queue.
NextVisibleTime DateTimeOffset The time that the message will next be visible.
PopReceipt string The message's pop receipt.

Poison messages

When a queue trigger function fails, Azure Functions retries the function up to five times for a given queue message, including the first try. If all five attempts fail, the functions runtime adds a message to a queue named <originalqueuename>-poison. You can write a function to process messages from the poison queue by logging them or sending a notification that manual attention is needed.

To handle poison messages manually, check the dequeueCount of the queue message.

Polling algorithm

The queue trigger implements a random exponential back-off algorithm to reduce the effect of idle-queue polling on storage transaction costs.

The algorithm uses the following logic:

  • When a message is found, the runtime waits two seconds and then checks for another message
  • When no message is found, it waits about four seconds before trying again.
  • After subsequent failed attempts to get a queue message, the wait time continues to increase until it reaches the maximum wait time, which defaults to one minute.
  • The maximum wait time is configurable via the maxPollingInterval property in the host.json file.

For local development the maximum polling interval defaults to two seconds.

In regard to billing, time spent polling by the runtime is "free" and not counted against your account.

Concurrency

When there are multiple queue messages waiting, the queue trigger retrieves a batch of messages and invokes function instances concurrently to process them. By default, the batch size is 16. When the number being processed gets down to 8, the runtime gets another batch and starts processing those messages. So the maximum number of concurrent messages being processed per function on one virtual machine (VM) is 24. This limit applies separately to each queue-triggered function on each VM. If your function app scales out to multiple VMs, each VM will wait for triggers and attempt to run functions. For example, if a function app scales out to 3 VMs, the default maximum number of concurrent instances of one queue-triggered function is 72.

The batch size and the threshold for getting a new batch are configurable in the host.json file. If you want to minimize parallel execution for queue-triggered functions in a function app, you can set the batch size to 1. This setting eliminates concurrency only so long as your function app runs on a single virtual machine (VM).

The queue trigger automatically prevents a function from processing a queue message multiple times simultaneously.

host.json properties

The host.json file contains settings that control queue trigger behavior. See the host.json settings section for details regarding available settings.

Next steps