Snowflake Error Integration for snowpipe and tasks become generally available for AWS, Azure and GCP in February 2023. You can send error message to Azure Event Grid if data pipeline fails. Then message can trigger Azure Function sending teams channel notification. Let’s discribe it in this article.
Architecture Diagram
Implementation Example
Instruction
1. Create Azure Event Grid Topic
2. Create Integration in Snowflake. Please replace XXX in order to refer to Azure Event Grid Topic created in point 1.
CREATE NOTIFICATION INTEGRATION IF NOT EXISTS ERROR_TEAMS_CHANNEL ENABLED = true TYPE = QUEUE NOTIFICATION_PROVIDER = AZURE_EVENT_GRID DIRECTION = OUTBOUND AZURE_EVENT_GRID_TOPIC_ENDPOINT = 'https://XXXXXXXXXXXXXXXXXXXX.westeurope-1.eventgrid.azure.net/api/events' AZURE_TENANT_ID = 'XXXXXXXXXXXXXXXXXXXXXX';
3. Add Error Integration to Snowflake Tasks and Pipes. Please replace XXX by task name and pipe name.
ALTER TASK XXXX SET ERROR_INTEGRATION = ERROR_TEAMS_CHANNEL; ALTER PIPE XXXX SET ERROR_INTEGRATION = ERROR_TEAMS_CHANNEL;
4. Create Teams Channel webhook
Create an Incoming Webhook – Teams | Microsoft Learn
5. Create EventGrid triggered Python Function, replace __init__.py by following code and deploy. Please replace XXX by your teams channel webhook.
Create a Python function using Visual Studio Code – Azure Functions | Microsoft Learn
import json import logging import requests import azure.functions as func def main(event: func.EventGridEvent): #get and log message input message_text = event.get_json() json_string = json.dumps(message_text) logging.info('Python EventGrid trigger processed an event: %s', json_string) #api configuration endpoint = 'XXXXX' api_headers = {"Content-type":"application/json"} # Get values from event hub message datafactory= 'SNOWFLAKE_' + message_text['messageType'] timestamp = message_text['timestamp'] if message_text['messageType'] == 'INGEST_FAILED_FILE': pipeline = message_text['pipeName'] activity = message_text['pipeName'] error_message= message_text['messages'][0]['firstError'] elif message_text['messageType'] == 'USER_TASK_FAILED': pipeline = message_text['rootTaskName'] activity = message_text['taskName'] error_message= message_text['messages'][0]['errorMessage'] else: pipeline = 'Unknown' activity = 'Unknown' error_message= message_text['messages'][0]['errorMessage'] # Prepare body body = { "@type": "MessageCard", "@context": "http://schema.org/extensions", "summary": "Error Message", "sections": [ { "activityImage": "https://miro.medium.com/max/1400/1*pUEZd8z__1p-7ICIO1NZFA.png", "activityTitle": "Data Platform pipeline failed", "activitySubtitle": "" + error_message, "facts": [ { "name": "Datafactory", "size": "Medium", "value": "" + datafactory }, { "name": "Pipeline", "value": "" + pipeline }, { "name": "Activity", "value": "" + activity }, { "name": "When", "value": "" + timestamp } ], "markdown": 'true' }] } # Making call response = requests.post(url=endpoint, json=body, headers=api_headers) logging.info('Message was sent to teams channel')
6. Create Event Subscription that will trigger Azure Function if new message arrives.
Azure Event Grid subscriptions through portal – Azure Event Grid | Microsoft Learn