Skip to content

Window View got stuck at "Waiting mutation: mutation_402.txt" #315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
tr33oph opened this issue Mar 29, 2025 · 1 comment
Open

Window View got stuck at "Waiting mutation: mutation_402.txt" #315

tr33oph opened this issue Mar 29, 2025 · 1 comment
Assignees

Comments

@tr33oph
Copy link

tr33oph commented Mar 29, 2025

Describe the unexpected behaviour
Window view like clickhouse server.

How to reproduce

chdb==3.1.2
Python 3.10.12

Distributor ID: Ubuntu
Description: Ubuntu 22.04 LTS
Release: 22.04
Codename: jammy

import os
from chdb import session as chs

import threading
import time
import random
from datetime import datetime

os.system('rm -rf tmp/test-wv.db')
sess = chs.Session('tmp/test-wv.db?verbose&log-level=test')
sess.query("""
set allow_experimental_window_view = 1;
set allow_experimental_analyzer = 0;
CREATE TABLE IF NOT EXISTS my_table (timestamp DateTime, value Int) ENGINE = MergeTree() ORDER BY (timestamp);

CREATE TABLE IF NOT EXISTS  wv_destination (
    window_start DateTime,
    an UInt64
) ENGINE = MergeTree()
ORDER BY (window_start)
SETTINGS index_granularity = 8192;

CREATE WINDOW VIEW  IF NOT EXISTS minute_window_view
TO wv_destination
WATERMARK=ASCENDING
AS
SELECT
    tumbleStart(w_id) AS window_start,
    sum(value) AS an
FROM my_table
GROUP BY tumble(timestamp, INTERVAL '10' SECOND) AS w_id;
""")


running = True
def background_thread():
    global running
    while running:
        value = random.randint(1, 100)
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print('insert', timestamp, value)
        sess.query(f"INSERT INTO my_table (timestamp, value) VALUES ('{timestamp}', {value}) ;"*100)
        print(f"Inserted value: {value} at {timestamp}")
        time.sleep(1)

thread = threading.Thread(target=background_thread, args=())
thread.start()

try:
    while True:
        time.sleep(5)
except KeyboardInterrupt:
    print('ctrl+c exit')
finally:
    running = False
    print('stop')
    thread.join()

Expected behavior
Insert done and window view calc done.

Error message and/or stacktrace

debug info (tails):

....

2025.03.29 19:22:12.510268 [ 3501648 ] {} <Test> MergeTreeMarksLoader: Loading marks from path data.cmrk3
2025.03.29 19:22:12.510348 [ 3501708 ] {} <Test> MergeTreeRangeReader: First reader returned: num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]:  UInt32(size = 1), column[1]:  AggregateFunction(size = 1), requested columns: windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510357 [ 3501697 ] {} <Test> MergeTreeRangeReader: First reader returned: num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]:  UInt32(size = 1), column[1]:  AggregateFunction(size = 1), requested columns: windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510397 [ 3501708 ] {} <Test> MergeTreeRangeReader: read() returned num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]:  UInt32(size = 1), column[1]:  AggregateFunction(size = 1), sample block windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510417 [ 3501697 ] {} <Test> MergeTreeRangeReader: read() returned num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]:  UInt32(size = 1), column[1]:  AggregateFunction(size = 1), sample block windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510548 [ 3501697 ] {} <Trace> MergingAggregatedTransform: Reading blocks of partially aggregated data.
2025.03.29 19:22:12.510584 [ 3501574 ] {} <Test> MergeTreeRangeReader: First reader returned: num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]:  UInt32(size = 1), column[1]:  AggregateFunction(size = 1), requested columns: windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510586 [ 3501666 ] {} <Test> MergeTreeRangeReader: First reader returned: num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]:  UInt32(size = 1), column[1]:  AggregateFunction(size = 1), requested columns: windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510635 [ 3501648 ] {} <Test> MergeTreeRangeReader: First reader returned: num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]:  UInt32(size = 1), column[1]:  AggregateFunction(size = 1), requested columns: windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510728 [ 3501666 ] {} <Test> MergeTreeRangeReader: read() returned num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]:  UInt32(size = 1), column[1]:  AggregateFunction(size = 1), sample block windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510742 [ 3501574 ] {} <Test> MergeTreeRangeReader: read() returned num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]:  UInt32(size = 1), column[1]:  AggregateFunction(size = 1), sample block windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510754 [ 3501648 ] {} <Test> MergeTreeRangeReader: read() returned num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]:  UInt32(size = 1), column[1]:  AggregateFunction(size = 1), sample block windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.511097 [ 3501574 ] {} <Debug> MergingAggregatedTransform: Read 4 blocks of partially aggregated data, total 4 rows.
2025.03.29 19:22:12.511147 [ 3501574 ] {} <Trace> Aggregator: Merging partially aggregated single-level data.
2025.03.29 19:22:12.511206 [ 3501574 ] {} <Trace> Aggregator: Merged partially aggregated single-level data.
2025.03.29 19:22:12.511229 [ 3501574 ] {} <Trace> Aggregator: Converting aggregated data to blocks
2025.03.29 19:22:12.511349 [ 3501574 ] {} <Debug> Aggregator: Converted aggregated data to blocks. 1 rows, 16.00 B in 7.8917e-05 sec. (12671.541 rows/sec., 197.99 KiB/sec.)
2025.03.29 19:22:12.513095 [ 3501568 ] {} <Test> InterpreterInsertQuery: Pipeline could use up to 0 thread
2025.03.29 19:22:12.513838 [ 3501568 ] {} <Trace> default.wv_destination: Trying to reserve 1.00 MiB using storage policy from min volume index 0
2025.03.29 19:22:12.513910 [ 3501568 ] {} <Trace> DiskLocal: Reserved 1.00 MiB on local disk `default`, having unreserved 33.69 GiB.
2025.03.29 19:22:12.514943 [ 3501568 ] {} <Trace> MergedBlockOutputStream: filled checksums all_1_1_0 (state Temporary)
2025.03.29 19:22:12.515775 [ 3501568 ] {} <Trace> default.wv_destination: Renaming temporary part tmp_insert_all_1_1_0 to all_1_1_0 with tid (1, 1, 00000000-0000-0000-0000-000000000000).
2025.03.29 19:22:12.515939 [ 3501568 ] {} <Test> default.wv_destination: preparePartForCommit: inserting all_1_1_0 (state PreActive) into data_parts_indexes
2025.03.29 19:22:12.516533 [ 3501568 ] {} <Debug> MutationsInterpreter(default.`.inner.minute_window_view`): Will use old analyzer to prepare mutation
2025.03.29 19:22:12.517002 [ 3501566 ] {426c292f-c3cd-46a4-8e0c-2241dcec6786} <Trace> InterpreterSelectQuery: FetchColumns -> WithMergeableState
2025.03.29 19:22:12.523481 [ 3501568 ] {} <Information> default.`.inner.minute_window_view`: Added mutation: mutation_402.txt
2025.03.29 19:22:12.523548 [ 3501568 ] {} <Information> default.`.inner.minute_window_view`: Waiting mutation: mutation_402.txt

@wudidapaopao wudidapaopao self-assigned this Apr 8, 2025
@wudidapaopao
Copy link
Contributor

wudidapaopao commented Apr 9, 2025

Hi @tr33oph, the default value of background_schedule_pool_size is currently 1. When window views trigger mutations for intermediate data deletion, this may cause blocking issues. To address this, you can manually set background_schedule_pool_size to a value greater than 1, for example:
sess = chs.Session('tmp/test-wv.db?background_schedule_pool_size=8')
Local testing shows no blocking issues so far with this configuration. If further problems arise, feel free to continue the discussion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants