Skip to main content

Building an incremental logs model in dbt

Ingesting Modelbit logs into Snowflake

To begin, make sure you are ingesting your API logs into Snowflake. This integration will ingest your Modelbit API logs into a table called modelbit_logs_raw that has two columns:

  • data, a json column containing the logs data in a JSON array
  • ingested_at, a timestamp column with the UTC timestamp that the row was ingested into Snowflake.

We'll use these columns to build our dbt model.

Building an incremental model

By using an incremental dbt model, we can ensure that the work of parsing the JSON log lines is done only once on a schedule, rather than re-done on every dbt run, or re-done on every user query run.

To build an incremental dbt model called modelbit_logs_incremental, use SQL like this:

models/modelbit_logs_incremental.sql
{{
config(
materialized='incremental'
)
}}
with logs_json as (
select
try_parse_json(value::varchar) log_line,
ingest_ts

from modelbit_logs_raw, table(flatten(json($1)))

{% if is_incremental() %}
where modelbit_logs_raw.ingested_at > (select max(ingested_at) from {{ this }})
{% endif %}
)
select
log_line:batchDurationMs::int as batch_duration_ms,
log_line:batchEndTs::int as batch_end_ts,
log_line:batchStartTs::int as batch_start_ts,
log_line:batchId::varchar as batch_id,
log_line:workspaceId::varchar as workspace_id,
log_line:totalDeploymentRuntimeMs::int as total_deployment_runtime_ms,
log_line:deploymentVersion::int as deployment_version,
log_line:deploymentName::varchar as deployment_name,
log_line:error::boolean as err,
log_line:resultCount::int as result_count,
log_line:source::varchar as method,
log_line:branch::varchar as branch,
log_line:alias::varchar as alias,
ingested_at
from logs_json

With this model in place, select * from modelbit_logs_incremental will return all log lines from the beginning of time to the last dbt run. Their JSON will have already been parsed out into a typed SQL table.

Using a view for very recent log lines

Optionally, you can also make a dbt model that's materialized as a view that contains all log lines since the last dbt run. This will let your SQL users see fully up-to-date logs.

The SQL for that dbt model is:

models/modelbit_logs_not_yet_unpacked.sql
{{
config(
materialized='view'
)
}}
with logs_json as (
select
try_parse_json(value::varchar) log_line,
ingest_ts

from modelbit_logs_raw, table(flatten(json($1)))

where modelbit_logs_raw.ingested_at > (select max(ingested_at) from {{ ref('modelbit_logs_incremental') }})
)
select
log_line:batchDurationMs::int as batch_duration_ms,
log_line:batchEndTs::int as batch_end_ts,
log_line:batchStartTs::int as batch_start_ts,
log_line:batchId::varchar as batch_id,
log_line:workspaceId::varchar as workspace_id,
log_line:totalDeploymentRuntimeMs::int as total_deployment_runtime_ms,
log_line:deploymentVersion::int as deployment_version,
log_line:deploymentName::varchar as deployment_name,
log_line:error::boolean as err,
log_line:resultCount::int as result_count,
log_line:source::varchar as method,
log_line:branch::varchar as branch,
log_line:alias::varchar as alias,
ingested_at
from logs_json

Putting both views together for easy querying

Finally you can build a dbt model of all Modelbit logs, including log lines that have been unpacked for efficient querying, and log lines that have yet to be unpacked, with a dbt view like this one:

models/modelbit_logs_all.sql
{{
config(
materialized='view'
)
}}
select * from {{ ref('modelbit_logs_incremental') }}
union all select * from {{ ref('modelbit_logs_not_yet_unpacked') }}

With this view in place, the query select * from modelbit_logs_all will query every log line from the beginning of time. It will:

  • Efficiently query all log lines up to the last dbt run, which JSON has already been unpacked
  • Unpack the JSON at query time of all log lines since that dbt run.

Whether to query only the pre-unpacked logs, or to query all of them, depends on the needs and performance constraints of your queries.