Snowflake Task Error Notification (Teams Channel)

Snowflake Error Integration for snowpipe and tasks become generally available for AWS, Azure and GCP in February 2023.  You can send error message to Azure Event Grid if data pipeline fails. Then message can trigger Azure Function sending teams channel notification. Let’s discribe it in this article.

Architecture Diagram

Implementation Example

Instruction

1. Create Azure Event Grid Topic

Create, view, and manage system topics in Azure Event Grid (portal) – Azure Event Grid | Microsoft Learn

2. Create Integration in Snowflake. Please replace XXX in order to refer to Azure Event Grid Topic created in point 1.
CREATE NOTIFICATION INTEGRATION IF NOT EXISTS ERROR_TEAMS_CHANNEL
  ENABLED = true
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = AZURE_EVENT_GRID
  DIRECTION = OUTBOUND
  AZURE_EVENT_GRID_TOPIC_ENDPOINT = 'https://XXXXXXXXXXXXXXXXXXXX.westeurope-1.eventgrid.azure.net/api/events'
  AZURE_TENANT_ID = 'XXXXXXXXXXXXXXXXXXXXXX';
3. Add Error Integration to Snowflake Tasks and Pipes. Please replace XXX by task name and pipe name.
ALTER TASK XXXX SET ERROR_INTEGRATION = ERROR_TEAMS_CHANNEL;
ALTER PIPE XXXX SET ERROR_INTEGRATION = ERROR_TEAMS_CHANNEL;
4. Create Teams Channel webhook

Create an Incoming Webhook – Teams | Microsoft Learn

5. Create EventGrid triggered Python Function, replace __init__.py by following code and deploy. Please replace XXX by your teams channel webhook.

Create a Python function using Visual Studio Code – Azure Functions | Microsoft Learn

import json
import logging
import requests
import azure.functions as func


def main(event: func.EventGridEvent):

    #get and log message input
    message_text = event.get_json()
    json_string = json.dumps(message_text)
    logging.info('Python EventGrid trigger processed an event: %s', json_string)

    #api configuration
    endpoint = 'XXXXX'
    api_headers = {"Content-type":"application/json"}

    # Get values from event hub message
    datafactory= 'SNOWFLAKE_' + message_text['messageType']
    timestamp = message_text['timestamp']

    if message_text['messageType'] == 'INGEST_FAILED_FILE':
        pipeline = message_text['pipeName']
        activity = message_text['pipeName']
        error_message= message_text['messages'][0]['firstError']
    elif message_text['messageType'] == 'USER_TASK_FAILED':
        pipeline = message_text['rootTaskName']
        activity = message_text['taskName']
        error_message= message_text['messages'][0]['errorMessage']
    else:
        pipeline = 'Unknown'
        activity = 'Unknown'
        error_message= message_text['messages'][0]['errorMessage']

    # Prepare body
    body = {
        "@type": "MessageCard",
        "@context": "http://schema.org/extensions",
        "summary": "Error Message",
        "sections": [
            {
                "activityImage": "https://miro.medium.com/max/1400/1*pUEZd8z__1p-7ICIO1NZFA.png",
                "activityTitle": "Data Platform pipeline failed",
                "activitySubtitle": "" + error_message,
                "facts": [
                    {
                    "name": "Datafactory",
                    "size": "Medium",
                    "value": "" + datafactory
                    },
                    {
                    "name": "Pipeline",
                    "value": "" + pipeline
                    },
                    {
                    "name": "Activity",
                    "value": "" + activity
                    },
                    {
                    "name": "When",
                    "value": "" + timestamp
                    }
                ],
                "markdown": 'true'
            }]
    }

    # Making call
    response = requests.post(url=endpoint, json=body, headers=api_headers)
    logging.info('Message was sent to teams channel')
6. Create Event Subscription that will trigger Azure Function if new message arrives.

Azure Event Grid subscriptions through portal – Azure Event Grid | Microsoft Learn