-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: Support SELECT on SUBSCRIPTION #88
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
--- | ||
feature: select_from_subscription | ||
authors: | ||
- "hzxa21" | ||
start_date: "2024/05/08" | ||
--- | ||
|
||
# | ||
|
||
## Motivation | ||
|
||
This RFC aims to provide a solution to enable user to run query on a changelog stream from a MV/Table, like https://github.com/risingwavelabs/rfcs/pull/83, | ||
but with a different syntax. It uses the `subscription` concept instead of a TVF to represent the changelog stream. | ||
|
||
Let's first recap on the current usage on `subscription`: | ||
```SQL | ||
CREATE TABLE t (pk int PRIMARY KEY, v int); | ||
|
||
CREATE SUBSCRIPTION sub FROM t WITH ( | ||
retention = '1D' | ||
); | ||
|
||
DECLARE cur SUBSCRIPTION CURSOR FOR sub; | ||
|
||
FETCH NEXT FROM cur; | ||
pk | v | op | rw_timestamp | ||
-------+-------+----+-------------- | ||
25184 | 25184 | 1 | | ||
(1 row) | ||
|
||
FETCH NEXT FROM cur; | ||
t.pk | t.v | t.op | rw_timestamp | ||
-------+-------+------+--------------- | ||
18704 | 18704 | 2 | 1715021447040 | ||
(1 row) | ||
|
||
... | ||
|
||
``` | ||
|
||
Basically we support `FETCH NEXT FROM <cursor>` on a `subscription` to get the changelog stream with schema `(original schema | op | rw_timestamp)`. | ||
|
||
|
||
This RFC will extend the usage of `subscription` to support `SELECT` query on it as well. The output schema of the SELECT query is `(original schema | op | rw_timestamp)`. | ||
|
||
## Design | ||
### Batch SELECT on subscription | ||
```SQL | ||
CREATE TABLE t (pk int PRIMARY KEY, v int); | ||
|
||
INSERT INTO t values (1, 1); | ||
|
||
CREATE SUBSCRIPTION sub FROM t WITH ( | ||
retention = '1D' | ||
); | ||
|
||
|
||
INSERT INTO t VALUES (2, 2); | ||
|
||
UPDATE t SET v = 11 WHERE pk = 1; | ||
|
||
# op = 1 insert | ||
# op = 2 delete | ||
# op = 3 update_insert | ||
# op = 4 update_delete | ||
SELECT * FROM sub; | ||
t.pk | t.v | t.op | rw_timestamp | ||
-------+-------+------+--------------- | ||
2 | 2 | 1 | 1715021447030 | ||
1 | 1 | 4 | 1715021447040 | ||
1 | 11 | 3 | 1715021447040 | ||
(3 rows) | ||
|
||
SELECT t.pk as pk, t.v as v, CASE WHEN t.op = 2 or t.op = 4 THEN TRUE ELSE FALSE END as is_delete FROM sub; | ||
pk | v | is_delete | ||
------+-------+------------ | ||
2 | 2 | f | ||
1 | 1 | t | ||
1 | 11 | f | ||
(3 rows) | ||
``` | ||
|
||
- Similar to the requirement for CURSOR query, `retention` must be specified when creating a subscription, that is eligble for batch select. | ||
- Unlike CUSOR query, batch select won't see historical snapshot data when the subscription is created. It only sees the data after the subscription is created. Technically the following two queries are equivalent: | ||
```SQL | ||
SELECT * FROM sub; | ||
|
||
# equivalent to FETCH NEXT from cursor created with since begin() till rw_timestamp == now() | ||
|
||
DECLARE cur SUBSCRIPTION CURSOR FOR sub SINCE BEGIN(); | ||
FETCH NEXT FROM cur; | ||
... | ||
``` | ||
|
||
Use case: one-shot inspection on the MV/Table changelog for debugging | ||
|
||
|
||
### Streaming SELECT on subscription | ||
```SQL | ||
CREATE TABLE t (pk int PRIMARY KEY, v int); | ||
|
||
CREATE SUBSCRIPTION sub FROM t; | ||
|
||
CREATE MATERIALIZED VIEW t_changelog AS | ||
SELECT * FROM sub; | ||
|
||
CREATE SINK t_changelog_sink | ||
AS SELECT t.pk as pk, t.v as v, CASE WHEN t.op = 2 or t.op = 4 THEN TRUE ELSE FALSE END as is_delete FROM sub; | ||
``` | ||
|
||
- User can create sink/mv on the subscription to get the changelog stream of the MV/Table the subscription is created upon. | ||
- `retention` is optional for subscription eligble for streaming query. | ||
- When `retention` is set, the corresponding log store will be created and the subscription is eligble for cursor query, batch select query, and streaming query. | ||
- When `retention` is not set, no log store will be created for the subscription and the subscription is only eligble for streaming query. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussion: should we have a default retention like 10mins when retention is not set for convenience to make all subscriptions eligible for cursor query, batch select query and streaming query? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In my initial design, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the "create sink/mv from subscription" use case, we don't necessarily need to buffer the changelog data under global checkpoint. The reason why I make |
||
- A streaming projection executor will be created to convert schema of the upstream stream chunk to `(original schema | op | rw_timestamp)` for streaming query on subscription. Implementation-wise, this is essentially the table value function mentioned in https://github.com/risingwavelabs/rfcs/pull/83. | ||
|
||
#### Use-case 1: Alerting MV/Sink based on the first event passing a threshold | ||
(this example is borrowed from https://github.com/risingwavelabs/rfcs/pull/83) | ||
```SQL | ||
CREATE SOURCE events_source( | ||
id: int, | ||
msg: varchar, | ||
status: varchar, | ||
event_time timestamptz, | ||
proc_time timestampz AS proctime() | ||
); | ||
|
||
CREATE MATERIALIZED VIEW events as | ||
SELECT * FROM events_source | ||
WHERE proc_time > NOW() - INTERVAL '7 days' | ||
ORDER BY event_time; | ||
|
||
CREATE MATERIALIZED VIEW windowed_result as | ||
SELECT | ||
window_start, | ||
count(*) as all_count, | ||
count(*) filter(where status = 'error') as error_count | ||
FROM TUMBLE (events, event_time, INTERVAL '1 MINUTES') | ||
GROUP BY window_start; | ||
|
||
CREATE SUBSCRIPTION windowed_result_sub from windowed_result; | ||
|
||
# op = 1: insert | ||
CREATE MATERIALIZED VIEW error_minutes as | ||
SELECT | ||
distinct on window_start, | ||
window_start, | ||
FROM windowed_result_sub | ||
WHERE error_count > all_count * 0.9 AND op = 1; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Off-topic: maybe we can introduce a static variable, e.g, |
||
|
||
CREATE MATERIALIZED VIEW detailed_msg_in_error_minutes as | ||
SELECT events.* | ||
FROM error_minutes | ||
JOIN events FOR SYSTEM_TIME AS OF PROCTIME() | ||
ON events.event_time between window_start AND window_start + INTERVAL '1 MINUTES'; | ||
``` | ||
|
||
#### Use-case 2: create sink from non-append-only MV/Table to append-only external sink | ||
```SQL | ||
CREATE TABLE t (pk int PRIMARY KEY, v int); | ||
|
||
# op = 1 insert | ||
# op = 2 delete | ||
# op = 3 update_insert | ||
# op = 4 update_delete | ||
CREATE SUBSCRIPTION sub FROM t; | ||
|
||
|
||
# create sink to a starrocks unique key table (support upsert but not delete) with schema (t, v, is_delete) by filtering out update delete | ||
CREATE SINK t_sr_unique_key_table | ||
AS SELECT t.pk as pk, t.v as v, CASE WHEN t.op = 2 THEN TRUE ELSE FALSE END as is_delete FROM sub where op != 4 | ||
WITH ( | ||
connector = 'starrocks', | ||
... | ||
) | ||
|
||
# create sink to append-only snowflake table (snowflake doesn't support streaming CDC) with schema (t, v, __delete) | ||
CREATE SINK t_snowflake_append_only | ||
AS SELECT t.pk as pk, t.v as v, CASE WHEN t.op = 2 or t.op = 4 THEN TRUE ELSE FALSE END as __delete FROM sub | ||
WITH ( | ||
connector = 'snowflake', | ||
... | ||
) | ||
``` | ||
|
||
|
||
## Alternatives | ||
https://github.com/risingwavelabs/rfcs/pull/83 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we persist the created subscription to catalog?
It seems wired to persist it to catalog. Users were intended to only create the sink, and the subscription created here is only a sugar to express the semantic of change log, which is more like a temporary thing. If we persist it, users will have to first inspect whether there are some created subscription, and then create the subscription, and then create the sink, and if not, users can just create the subscription without any concern, and then create the sink.
If we won't persist it, maybe we can make the subscription created as a per-session temporary object, or express subscription as CTE, like
WITH sub AS SUBSCRIPTION from t create materialized view as select * from sub
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have already designed subscription semantically as a persistent object. So for now, it must be visible accross multiple sessions.
But I agree with you that we can specifically introduce a per-session, "temporary subscription" for this user case to address the inconvenience, like:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, we should need an execute to convert the op column of streamchunk into a changelog, so persistence is only a syntactic difference (do users need to create subscriptions)?