DBT Incremental Testing

Chaim Turkel
Israeli Tech Radar
Published in
6 min readMay 3, 2023

--

As you know I love DBT (https://www.getdbt.com/) and I am a very big advocate for building a self-service data platform using DBT as the ELT block. Incremental tables are a major building block, and as we will see DBT has a hole in this area (as in not whole :) ).

2 Words on DBT and ETL

ETL stands for Extract, Transform, and Load. It refers to the process of extracting data from various sources, transforming it to fit the needs of the target system or database, and loading it into the destination system.

DBT stands for Data Build Tool. It is an open-source command-line tool that is designed to help analysts and engineers manage their data transformations in a version-controlled, repeatable, and modular way.

In summary, ETL is a process for moving data from source to destination, while DBT is a tool for transforming and analyzing data in a repeatable and modular way.

What is an incremental table?

When creating a new table model in DBT you have the option for it to be a view or a materialized table. Trade-offs are performance vs freshness of data. With a view, your data is up to date, but the query might take a long time (depending on joins, filters, and other parameters). On the other hand, materialized tables are fast to query (data is in place) but are updated only when the table is refreshed.

For example, let's say my source table has a JSON field, and I want to extract data from it into columns. This is a very heavy process that I don’t want to do every time I select from the table. So this is a great candidate for a materialized table. To make sure that my table is relatively fresh I will update the table every few hours. The problem is that recreating the materialized table every few hours is very costly. What I would like to do is only process the newly added rows, and not reprocesses old rows. This is where incremental tables come in.

From the documentation of DBT:

Incremental models are built as tables in your data warehouse. The first time a model is run, the table is built by transforming all rows of source data. On subsequent runs, dbt transforms only the rows in your source data that you tell dbt to filter for, inserting them into the target table which is the table that has already been built.

So if we are building a table based on a JSON table, we would add a where statement to our table, where the id in the JSON table is bigger than ours. This way DBT will bring in only new rows from the JSON table to be processed.

In DBT jargon:

{% if is_incremental() %}

— this filter will only be applied on an incremental run
where event_time > (select max(event_time) from {{ this }})

{% endif %}

Incremental Tests

So now that we understand what an incremental table is, let's look at what happens when we add a test to the table.

We will look at a simple case of a test of not null. This test scans the table to make sure that there are no nulls in the column that we choose (let's say id). To my surprise, with an incremental table, the test rescans the whole table and not just the new rows. Of course, you must understand your tests before judging my statement. If you want to run a test of uniqueness you must do a full table scan. But for tests that check the data and do not compare it to other rows, it would be much more efficient to run only on the changed data.

By going to dbt slack community you can find different solutions. Some of them are done by adding a new column of merged_at, and then adding it to the select of your test. But we were looking for something more generic.

Solution

The overview of the solution (details down below) is that we have a helper table to keep track of the incremental tables. On every run, we update the helper table with the last incremental date. On the test, we create a macro that reads the helper table and adds a filter per table according to the date in the helper table.

Helper table

We created a table: infra__incremental_model_latest_test_record that has the following columns:

MODEL_NAME VARCHAR,

INCREMENTAL_COLUMN_NAME VARCHAR,

LATEST_TESTED_RECORD timestamp

Note: you cannot create new tables with DBT, you need to either use an empty seed or pre_hook to create the table or add some utility to do it.

Platform

Every time we run DBT on the incremental table we need to update the helper table. We do this by not running DBT directly but only through an internal library that runs DBT and then adds functionality after the run, like updating the helper table.

DBT Testing

Here is where the magic begins:

DBT gives a lot of solutions in an AOP manner. This means that there are built-in macros, that you can change the default behavior to your needs.

Since there is a need for where statements on tests, DBT has added the following feature:

version: 2

models:
- name: large_table
columns:
- name: my_column
tests:
- accepted_values:
values: ["a", "b", "c"]
config:
where: "date_column = current_date"
- name: other_column
tests:
- not_null:
where: "date_column < current_date"

So what DBT does is:

dbt replaces {{ model }} in generic test definitions with {{ get_where_subquery(relation) }}, where relation is a ref() or source() for the resource being tested. The default implementation of this macro returns:

{{ relation }} when the where config is not defined (ref() or source())

(select * from {{ relation }} where {{ where }}) dbt_subquery when the where config is defined

DBT also allows you to make global definitions that affect sections of your projects, this can be done in the dbt_project.yml file. So we can now define a where the statement that will affect all tests:

name: 'platform'
version: '1.0.0'
tests:
+where: "__incremental_model__"

Now what we have left to do is to override the default macro: get_where_subquery.sql. This code will replace the were statement in the incremental based on the helper table.

A sample of the code is like this:

{%- set get_latest_tested_record_time -%}

SELECT MODEL_NAME, INCREMENTAL_COLUMN_NAME, LATEST_TESTED_RECORD

FROM {{relation.database}}.INFRA.incremental_model_latest_test_record

WHERE UPPER(MODEL_NAME) = UPPER(‘{{relation.identifier}}’)

{%- endset -%}

{% set latest_tested_record_time = run_query(get_latest_tested_record_time) %}

{% set incremental_field = latest_tested_record_time.columns[1].values()[0] %}

{% set last_record_date_test = latest_tested_record_time.columns[2].values()[0] %}

{% set untested_records_filter = “/* incremental filter */ “ ~ incremental_field + “ > CAST(‘“ + last_record_date_test.strftime(“%Y-%m-%d %H:%M:%S”) + “‘ AS TIMESTAMP)” %}

{% set where = untested_records_filter %}

{%- set filtered -%}

(select * from {{relation}} where {{ where }}) dbt_subquery

{%- endset -%}

{% do return(filtered) %}

Summary

As you can see DBT is a great tool but has limitations. Leveraging the macros and data platform concept you can enhance the capabilities of DBT to much more.

References

--

--