Building an incremental logs model in dbt
Ingesting Modelbit logs into Snowflake
To begin, make sure you are ingesting your API logs into Snowflake. This integration
will ingest your Modelbit API logs into a table called modelbit_logs_raw
that has two columns:
data
, ajson
column containing the logs data in a JSON arrayingested_at
, atimestamp
column with the UTC timestamp that the row was ingested into Snowflake.
We'll use these columns to build our dbt model.
Building an incremental model
By using an incremental dbt model, we can ensure that the work of parsing the JSON log lines is done only once on a schedule, rather than re-done on every dbt run, or re-done on every user query run.
To build an incremental dbt model called modelbit_logs_incremental
, use SQL like this:
{{
config(
materialized='incremental'
)
}}
with logs_json as (
select
try_parse_json(value::varchar) log_line,
ingest_ts
from modelbit_logs_raw, table(flatten(json($1)))
{% if is_incremental() %}
where modelbit_logs_raw.ingested_at > (select max(ingested_at) from {{ this }})
{% endif %}
)
select
log_line:batchDurationMs::int as batch_duration_ms,
log_line:batchEndTs::int as batch_end_ts,
log_line:batchStartTs::int as batch_start_ts,
log_line:batchId::varchar as batch_id,
log_line:workspaceId::varchar as workspace_id,
log_line:totalDeploymentRuntimeMs::int as total_deployment_runtime_ms,
log_line:deploymentVersion::int as deployment_version,
log_line:deploymentName::varchar as deployment_name,
log_line:error::boolean as err,
log_line:resultCount::int as result_count,
log_line:source::varchar as method,
log_line:branch::varchar as branch,
log_line:alias::varchar as alias,
ingested_at
from logs_json
With this model in place, select * from modelbit_logs_incremental
will return all log lines from the beginning of time to
the last dbt run. Their JSON will have already been parsed out into a typed SQL table.
Using a view for very recent log lines
Optionally, you can also make a dbt model that's materialized as a view that contains all log lines since the last dbt run. This will let your SQL users see fully up-to-date logs.
The SQL for that dbt model is:
{{
config(
materialized='view'
)
}}
with logs_json as (
select
try_parse_json(value::varchar) log_line,
ingest_ts
from modelbit_logs_raw, table(flatten(json($1)))
where modelbit_logs_raw.ingested_at > (select max(ingested_at) from {{ ref('modelbit_logs_incremental') }})
)
select
log_line:batchDurationMs::int as batch_duration_ms,
log_line:batchEndTs::int as batch_end_ts,
log_line:batchStartTs::int as batch_start_ts,
log_line:batchId::varchar as batch_id,
log_line:workspaceId::varchar as workspace_id,
log_line:totalDeploymentRuntimeMs::int as total_deployment_runtime_ms,
log_line:deploymentVersion::int as deployment_version,
log_line:deploymentName::varchar as deployment_name,
log_line:error::boolean as err,
log_line:resultCount::int as result_count,
log_line:source::varchar as method,
log_line:branch::varchar as branch,
log_line:alias::varchar as alias,
ingested_at
from logs_json
Putting both views together for easy querying
Finally you can build a dbt model of all Modelbit logs, including log lines that have been unpacked for efficient querying, and log lines that have yet to be unpacked, with a dbt view like this one:
{{
config(
materialized='view'
)
}}
select * from {{ ref('modelbit_logs_incremental') }}
union all select * from {{ ref('modelbit_logs_not_yet_unpacked') }}
With this view in place, the query select * from modelbit_logs_all
will query every log line from the beginning of time. It will:
- Efficiently query all log lines up to the last dbt run, which JSON has already been unpacked
- Unpack the JSON at query time of all log lines since that dbt run.
Whether to query only the pre-unpacked logs, or to query all of them, depends on the needs and performance constraints of your queries.