This post is also available on the Materialize blog.
Materialize is a distributed SQL database built on streaming internals. With it, you can use the SQL you are already familiar with to build powerful stream processing capabilities. But as with any abstraction, sometimes the underlying implementation details leak through the abstraction. Queries that look simple and innocent when you are formulating them in SQL can sometimes require more resources than expected when evaluated incrementally against a continuous stream of arriving updates.
In this post we set out to peek behind the curtain of Materialize’s internals. You’ll get a better understanding of how SQL queries can be evaluated efficiently by Materialize. You’ll also learn where it makes sense to think about ways to restructure queries to reduce their resource requirements and improve their performance.
This post is not meant as a deep dive into the underlying foundation of Materialize, though. There are other places that cover the details in much more depth than what is required for our purposes. We want to lift the curtain just enough to develop an intuition about what happens under the hood to better understand which queries work well and which queries require additional thought and optimizations to run efficiently.
Incremental view maintenance and time-varying collections
One of Materialize’s core strengths is fast and efficient incremental view maintenance. A materialized view contains the result of a pre-defined query and Materialize will proactively update the result whenever the inputs of the query change. But instead of recomputing the materialized views from scratch when their inputs change, Materialize will just look at the update that changed the input and use it to update the materialized view incrementally.
These updates are represented internally by triples (data, time, diff)
. You can think of data
as the row of a table. diff
indicates whether the row was added or removed and time
indicates when this update happened according to some time that is internal to Materialize. For basic inserts and deletes the diff
is +1
and -1
, respectively. But as updates are gathered in a multiset and the diff
encodes the change in multiplicity, the diff
can be any nonzero integer. Changes to parts of a row are represented by two updates happening at the same time
, one to remove the original row and one to insert the updated row.
A table in Materialize is internally represented as a collection of these updates (so-called time-varying collections). This representation naturally enables incremental query evaluation: queries consume updates as their inputs and yield updates as result (which can in turn form the input of other queries). When all updates to a table are preserved in the collection, the historic version of a table at time t
can be reconstructed from the updates that happened before time t
. But although this flexibility is great from a theoretical point of view, it’s not practical to reconstruct a table from scratch each time a query is evaluated.
Materialize therefore uses a more efficient representation to keep track of updates called partial time-varying collections (pTVC). pTVCs still reflect the effect of every update, but they consolidate updates that happened before a time bound called since
into a more efficient representation through a process called compaction. The consolidated pTVC generally contains fewer individual updates, which results in improved query performance. This comes at the cost of limiting the time for which the historic versions of the table are preserved. Queries can still access the historic versions of a table, but only for times larger than since
. In an extreme case, a pTVC degenerates to the representation of tables as it is used in common databases (when since
is always equal to the current wall clock time).
For instance, consider row1
from the diagram above that is added to a table at time t1
and removed again at time t3
. The effect of these two updates cancel each other out. So if t1 < t3 <= since
, both of these updates can be removed from the pTVC. Because the pTVC does not need to preserve the complete history of updates that happened before since
, only the effect of the updates, the pTVC no longer needs to record that the row was added and removed again later. Similarly, in the following diagram, row1
is added twice at time
t1
and t3
. It is sufficient to know that we have added the row twice as soon as t1 <= t3 <= since
. So the pTVC only needs to contain a single update with a diff
of +2
.
If you want to learn more, this overview is a great starting point with references to relevant research papers. But for now we know enough to understand how this affects resource requirements of queries and sources.
Sources and source envelopes
Sources describe continually changing collections of data that Materialize persists in the storage layer. Inside the storage layer, the data is represented as a collection of updates (data, time, diff)
as part of a pTVC.
Materialize knows how to deserialize common data formats and stamps the update with its ingestion time. For the following discussion, it is only important to understand how Materialize can interpret the change the ingested records represent, the diff
of the update. So we mostly ignore data
and time
from here on and focus on diff
instead.
The way a change from the source should be interpreted is determined by the envelope of the source. Materialize currently supports three different envelopes: append-only, upsert, and Debezium. Depending on these envelopes, compaction of pTVCs can be more or less effective.
Append-only envelope
Sources with an append-only envelope never delete or retract any data. This means that the diff
of any change that the source observes is always going to be +1
. Append only sources are therefore great if you want to run historic analysis on your data. However, if all data is kept forever, compaction of the underlying pTVC will not be very effective as there are no updates to remove or consolidate. Only if an identical row is added twice and since
has advanced enough, compaction can eventually replace the two updates by a single update with a diff
of +2
. But this is rather rare for real world data sources and has therefore only marginal effect on the required storage of the underlying pTVC. We will see in the following how that affects query performance and recovery time.
Upsert and Debezium envelope
Sources with an upsert envelope are more elaborate. The upsert envelope treats all records as having a key and a value. A record for the same key overwrites the value of the corresponding key. For instance, if a record with key k
and value v1
is observed at time t1
and later at time t2
another record with key k
and value v2
is observed, the value of k
is eventually overwritten with v2
.
This aligns well with the compaction mechanism that is used by pTVCs. In the last example, three updates are initially stored in the pTVC: one update that adds the value v1
at t1
and at time t2
two more updates that remove v1
and add v2
. But once the lower bound since
has advanced beyond t2
, the two updates that add and remove v1
can be compacted away.
Materialize also provides a dedicated Debezium envelope that describes the decoded records’ old and new values with a Debezium-like diff structure. The envelope is fairly similar to the upsert envelope, it is just adapted to support the specific data schema that is used by Debezium.
Storage usage of time-varying collections
The envelope of a source has direct implications for the storage usage on the underlying pTVC. Append-only sources only ever append data to a pTVC, which limits the effectiveness of compaction and results in evergrowing storage usage. In contrast, upsert and Debezium sources often update the same key. Therefore, compaction keeps the size of pTVCs of these sources proportional to the key space, which is generally much smaller than the number of changes observed by the source.
It thus makes a substantial difference if 100 TB of data are ingested by a source with an append only or upsert envelope. In case of an append only envelope, the pTVC will require roughly 100 TB in storage. In contrast, with an upsert envelope, the total amount of ingested data is less relevant. Assuming that there are 1 million keys and each record has a size of 1kB, the pTVC can be compacted to roughly 1 GB, regardless of whether the source is ingesting 100 TB per year or per day.
This effect is not specific to sources, though. pTVCs that can observe retractions generally compact better and use less storage than those that don’t. The persisted size of a materialized view is proportional to the logical size of the relation it maintains. For instance, if the output of a materialized view does not observe retractions, the storage usage of the pTVC that persists the result of the materialized view does not compact well and will keep growing over time.
Query latency when reading from storage
Materialize is not limited to incremental view maintenance. It also supports ad-hoc queries that are submitted by users through a SQL client of their choice. When those queries are reading from a predefined index, they can make point lookups from the precomputed results that are stored in memory and respond quickly to the user’s request.
But ad-hoc queries are not limited to querying indexes. In Materialize, you can also run ad-hoc queries against sources, tables, and materialized views. In that case, the query reads the content of the underlying pTVCs from storage. Therefore, the size of the pTVCs directly impacts the query performance. If an ad-hoc query needs to read through 100 TB of data from storage, you need to be prepared to wait some time for the query to come back with an answer.
Today, Materialize will always read the complete pTVC from storage. This works well if the query needs to scan through the majority of the data. But it’s inefficient for selective queries that are only reading a small subset of the data. This is not a fundamental limitation of pTVCs, though, and we will discuss potential optimizations in the following sections.
Processing lag during failure recovery
Materialized views are continuously updated when their inputs change. Querying a materialized view often means just retrieving the already precomputed answer. In this way, the computational cost of maintaining materialized views is amortized over time.
For instance, if 100 TB of data are ingested over the course of a year, on average a bit over 3 MB of data per second need to be processed to keep a materialized view up to date. In contrast, an ad-hoc query to the same data would require reading through the 100 TB all at once. Even with a massive cluster with 32 nodes and 64 cores each, every core would need to process 50 GB per second to achieve an overall response time of 1 second. So for a steady state workload, maintaining materialized views has much lower resource requirements because the data arrives gradually over time and does not need to be processed all in one go.
However, it’s important to understand that the size of a pTVC impacts the time it takes to recover from failure. During recovery, Materialize reprocesses all historic updates from a recent compacted pTVC before it moves on processing the most recent updates. This ensures that all updates that should have been produced by a query have actually been persisted into storage and no update was skipped because of the event that caused the failure recovery. This self correcting mechanism is great for correctness, but it introduces some processing lag during recovery where new events need to wait until recovery is completed to be processed. So even if a materialized view needs to process 3 MB per second in a healthy cluster, if the cluster fails (e.g., because of a deployment during the maintenance window) it needs to reprocess the complete pTVC.
Let’s assume the example above is using a source with an append-only envelope and 3 MB of data are ingested per second. If data has accumulated for an entire year, the materialized view would need to process roughly 100 TB to rehydrate during recovery. In contrast, with a source with an upsert envelope and assuming 1 million unique keys and a record size of 1 kB, rehydration only needs to process roughly 1 GB of data.
To speed up recovery you can add a second, larger replica, that is able to complete the hydration process faster and then continues to process the most recent updates. This can substantially reduce the processing lag caused, but comes at an additional cost. Once the smaller steady replica has caught up, you can then remove the larger replica to save on cost.
Optimizing ad-hoc query performance and rehydration time
In addition to using more hardware resources for faster recovery, the team is working on various internal improvements to reduce ad-hoc query performance and rehydration time.
When using append-only data, users are often only interested in the recent history. Depending on the use-case this can mean a couple of days or weeks. The team is tracking efforts that allow users to specify what data the source should retain. So instead of storing everything indefinitely, users can control how long data should be stored to remove historic data from a pTVC that is no longer needed, even for sources with an append-only envelope. As part of a similar effort, the team is also looking into ways to push down predicates and projections directly into the storage layer so that irrelevant data is filtered efficiently directly in the storage layer.
Both efforts effectively reduce the amount of data that needs to be kept or read from storage. In this way, ad-hoc queries can return faster as they have less (irrelevant) data to process. For the same reason, these mechanisms also improve rehydration time of materialized views and indexes.
Memory footprint of queries
When queries are evaluated incrementally, they often need to keep some form of state to track their progress and emit correct updates. Some queries are stateless and require little to no memory whereas others have a substantial memory footprint that depends on the amount of data they are reading from their inputs. For some queries there are even different implementations for different envelope types that have different memory requirements.
Consider, for instance, the following query that simply counts the number of records in a table.
SELECT count(*) FROM input
Regardless of the amount of data in input
, the query needs to track the current total record count, which only takes a couple of bytes in memory.
The next query determines the max value in input
. Although it appears very similar to the previous query, the memory footprint actually depends on the envelope type.
SELECT max(*) FROM input
If input
can’t contain retractions, e.g. when the corresponding source uses an append only envelope, things are straight forward. Whenever a new update is added to the underlying pTVC, the query can just check if the added value is larger than the current max value and emit an update if required. However, if input
can contain retractions, e.g. when the corresponding source uses an upsert and Debezium envelope, it’s not sufficient to only track the largest value that has been observed so far. Because updates that added the current maximum value can be retracted at any time, the operator needs to maintain the count for all values that have been previously observed.
So when retractions can happen, the memory required by the query is proportionate to the number of unique values. Whereas without retraction, the required memory is just a few bytes to maintain the current max. Note that having different implementations for the same operator is not specific to max
. In general, queries that are using aggregations can often be evaluated more efficiently if their inputs are known to be free of retractions.
Joins are at the high end of the memory requirement spectrum. To correctly evaluate streaming joins, Materialize keeps the data of all their inputs in memory.
Optimizing the memory footprint of queries
There are two common ways to reduce the memory footprint of queries: first, reduce the amount of data that is relevant to answer the query and, second, share data that needs to be kept in memory between several queries.
Using filters and projections in queries naturally helps to reduce the data that needs to be processed. The query optimizer pushes down filters and projections in the query plan where possible. As a result, data that is not relevant to answer a query are removed early and, e.g., in case of joins, is not loaded into memory.
If multiple queries are concurrently accessing the same data in memory, the overall memory requirement per query can be reduced by loading data into memory only once and sharing it between queries. Creating indexes is a way to achieve that with Materialize. Internally, creating an index creates a shared arrangement that can be used by multiple queries concurrently. In this way, the data needs to be stored in memory only once and queries, in particular joins, can reuse the data and don’t need to store it in memory multiple times.
Further and more advanced techniques to optimize joins can be found in this blog post on optimizing joins with indexes and late materialization.
Memory footprint of sources
So far, we’ve been focussing on the resource requirements of queries. But sources also need resources to run. Similar to how different envelopes impact the resource requirements of queries, envelopes also impact the resource requirements of sources.
Sources with an append-only envelope simply add the changes they observe to a pTVC with a diff
of +1
. They are almost stateless as they are just writing updates into storage and don’t need additional context to know that the diff
of the update is +1
. This can be done with very little memory requirements.
Sources with an upsert envelope interpret records as having a key and a value. But by just looking at an individual upsert record, it is impossible to tell whether something is an insert, update, or delete. Consider, for instance, a record with a key k
. Looking at this record in isolation, it’s not clear if it represents an insert (when the key has not been observed before) or an update (when the key has been inserted or updated before). In a nutshell, to disambiguate these and other cases, the source needs to remember the last update for every key it has ever seen. Further of this conversion are explained in this blog post on upserts in Differential Dataflow.
Today, sources with an upsert envelope kept the last update of all keys in memory. Therefore, the memory requirement of sources with an upsert envelope is proportional to the number of unique keys of that source. The team is already prioritizing work to reduce the resource requirements of upsert sources to bounded, constant memory.
The memory utilization of sources with a Debezium envelope behaves like that of upsert sources. Debezium records contain information about the change captured, like before
for the old value and after
for the updated value of a key. In general, it seems like this information is enough for the source to emit insert, updates, and deletes. But Debezium may produce duplicate records if the connector is interrupted. Because Materialize makes a best-effort attempt to detects and filter out duplicates, it needs to keep messages in memory. Therefore, sources with a Debezium envelope have memory requirements that are proportional to the number of unique keys.
Last but not least there is the Postgres source that can directly ingest data from a Postgres database. It directly consumes from the logical replication log of the database. This has the advantage that it does not require additional infrastructure, like Apache Kafka or Redpanda, to ingest the raw CDC data. The source also respects transaction boundaries from the database. Materialize currently requires the replication log to be configured with REPLICA IDENTITY FULL
. In this way, the replication message format contains the old and the new data of the row and therefore the Postgres source does not need to remember previous updates of a key. It has therefore little memory requirements, similar to sources with append only envelopes.
Irrespective of the memory requirements of sources, larger sources have generally more resources and can therefore ingest a higher volume of records. So for high throughput sources, it can make sense to scale up sources regardless of their memory consumption to give them more compute resources to keep up ingesting the number of observed updates into the storage layer.
Summary
Evaluating queries and incrementally maintaining materialized views and indexes naturally requires resources. But as we have seen in the post, depending on whether the data can contain retractions or not, there are often different optimizations available that can be applied to when evaluating queries or when data is persisted in the storage layer.
pTVCs that can contain retractions generally compact well and require less storage. In particular for CDC type data, the size of the pTVC is bound by the number of keys, which is independent of the number of updates observed for the collection. A smaller storage footprint directly reduces the memory requirements of incrementally evaluated joins, because the input of queries with joins is kept in memory. The smaller the size of the pTVC, the better for the memory consumption of the join. However, in the presence of retractions, the memory footprint that is required to keep internal state for some queries, like aggregations, can be substantially higher compared to the same query that reads data that is free of retractions.
pTVCs that don’t observe retractions grow indefinitely and there is no upper bound for their size. This enables historic analysis of the data, but although Materialize scales out well, it can become cost prohibitive to run workloads that regularly need to process 10s or 100s of TB. But interestingly, the memory footprint that is required to keep the internal state of queries can be substantially smaller in the absence of retractions.
In the future, Materialize will be able to move computations to into the sources as well as into the storage layer. In particular the capability of pushing down filters and projections into the storage layer, will make queries on append-only data more economic when only the most recent subset of data is relevant for answering queries.
Equipped with this knowledge, you are now able to better understand the resource requirements of sources and queries, which is valuable when designing queries and for sizing a cluster. And if you have any questions or like to chat through any of these concepts, we’d be happy to connect in Slack or on Twitter.