Snowflake Data Quality Validation

Data Quality validation is very hot topic nowadays.  Usually, Data Analysts and Data Scientists require to execute some data validation rules before data is consumed by reporting or ML models.
Although, there are many tools allowing data quality checks like Great Expectations and DBT, It is not always easy to integrate these tools with existing ETL processes, especially if data pipelines are orchestrated by Snowflake Tasks.

I have created few Snowflake stored procedures that can be re-used for different tables and run data quality checks. All warning and failures are saved in table.

Each Stored Procedure has few input arguments:
– table name
– column name
– method (warn of fail)

Test pass

Test warn

Test fail

Test Results

Stored Procedure DQ_NOT_NULL
CREATE OR REPLACE PROCEDURE PLATFORM.DQ_NOT_NULL(table_name varchar, column_name varchar, warn_fail varchar)
RETURNS varchar
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
DECLARE
  test_name VARCHAR DEFAULT 'DQ_NOT_NULL';
  output VARCHAR;
  message_output VARCHAR;
BEGIN

    select count(1) into :output from identifier(:table_name) where identifier(:column_name) is null;
    
    IF (output>0) THEN
      message_output := :test_name || ' check failed. There are ' || :output || ' null values for ' || :column_name || ' column in ' || :table_name;
      INSERT INTO PLATFORM.DQ_CHECK_LOGGING VALUES (sysdate(),:test_name, :table_name, :column_name, :message_output);
      IF (warn_fail='FAIL') THEN
        call platform.assert(:message_output);
      END IF;
    ELSE
      message_output := :test_name || ' check passed.';  
    END IF;

    RETURN message_output;
    
END;
$$;
Stored Procedure DQ_UNIQUE
CREATE OR REPLACE PROCEDURE PLATFORM.DQ_UNIQUE(table_name varchar, column_name varchar, warn_fail varchar)
RETURNS varchar
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
DECLARE
  test_name VARCHAR DEFAULT 'DQ_UNIQUE';
  output VARCHAR;
  message_output VARCHAR;
BEGIN

    WITH FINAL AS (
    SELECT identifier(:column_name), count(1) as no_duplicates
    FROM identifier(:table_name)
    GROUP BY identifier(:column_name)
    HAVING count(1)>1)
    select sum(no_duplicates) into :output from FINAL;
    
    IF (output>0) THEN
      message_output := :test_name || ' check failed. There are ' || :output || ' duplicates for ' || :column_name || ' column in ' || :table_name;
      INSERT INTO PLATFORM.DQ_CHECK_LOGGING VALUES (sysdate(),:test_name, :table_name, :column_name, :message_output);
      IF (warn_fail='FAIL') THEN
        call platform.assert(:message_output);
      END IF;
    ELSE
      message_output := :test_name || ' check passed.';  
    END IF;

    RETURN message_output;
    
END;
$$;
Stored Procedure MIN_MAX
CREATE OR REPLACE PROCEDURE PLATFORM.DQ_MIN_MAX(table_name varchar, column_name varchar, warn_fail varchar, min_value number(36,16), max_value number(36,16))
RETURNS varchar
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
DECLARE
  test_name VARCHAR DEFAULT 'DQ_MIN_MAX';
  output VARCHAR;
  message_output VARCHAR;
BEGIN

    select count(1) into :output from identifier(:table_name) where identifier(:column_name) < :min_value or identifier(:column_name) > :max_value;
    
    IF (output>0) THEN
      message_output := :test_name || ' check failed. There are ' || :output || ' records for ' || :column_name || ' column in ' || :table_name || ' between ' || :min_value || ' and ' || :max_value;
      INSERT INTO PLATFORM.DQ_CHECK_LOGGING VALUES (sysdate(),:test_name, :table_name, :column_name, :message_output);
      IF (warn_fail='FAIL') THEN
        call platform.assert(:message_output);
      END IF;
    ELSE
      message_output := :test_name || ' check passed.';  
    END IF;

    RETURN message_output;
    
END;
$$;

Unfortunately, I didn’t manage to fail stored procedure with dynamic failure message using Exceptions in Snowflake SQL Scripting. That is why, I used javascript stored procedure to do so.

CREATE OR REPLACE PROCEDURE assert(VALUE STRING)
  RETURNS STRING
  LANGUAGE JAVASCRIPT
AS
$$
    throw `Raise Error: '${VALUE}'`
$$;