Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/common/src/msg/tmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -13188,7 +13188,7 @@ int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq, bo
TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->clientId));
if (pReq->pStRtFuncInfo) {
TAOS_CHECK_EXIT(tEncodeI32(&encoder, 1));
TAOS_CHECK_EXIT(tSerializeStRtFuncInfo(&encoder, pReq->pStRtFuncInfo, pReq->reset && needStreamPesudoFuncVals));
TAOS_CHECK_EXIT(tSerializeStRtFuncInfo(&encoder, pReq->pStRtFuncInfo, /* pReq->reset && */ needStreamPesudoFuncVals));
} else {
TAOS_CHECK_EXIT(tEncodeI32(&encoder, 0));
}
Expand Down
3 changes: 2 additions & 1 deletion source/libs/executor/src/externalwindowoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -1825,7 +1825,7 @@ static int32_t extWinAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes)
blockDataCleanup(pBlock);
taosArrayClear(pExtW->pWinRowIdx);

for (; pExtW->outputWinId < pExtW->pWins->size; pExtW->outputWinId += 1) {
for (; pExtW->outputWinId < pExtW->pWins->size; ++pExtW->outputWinId) {
SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->outputWinId);
int32_t winIdx = pWin->winOutIdx;
if (winIdx < 0) {
Expand Down Expand Up @@ -1856,6 +1856,7 @@ static int32_t extWinAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes)
TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pBlock, pRow->winIdx, pRow->numOfRows));

if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
++pExtW->outputWinId;
break;
}
}
Expand Down
156 changes: 156 additions & 0 deletions test/cases/18-StreamProcessing/02-Stream/stream_fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import time
import math
import random
from new_test_framework.utils import tdLog, tdSql, tdStream, streamUtil,StreamTableType, StreamTable, cluster
from random import randint
import os
import subprocess

class TestStreamFetch:
def setup_class(cls):
tdLog.debug(f"start to execute {__file__}")

def test_stream_fetch(self):
"""Stream fetch data from runner

1. Check stream interval result


Since: v3.3.8.11

Labels: common,ci

Jira: None

History:
- 2026-01-06 Stephen Jin Created

"""


self.prepareData()
# create the interval stream
self.createIntervalStream()
# check the interval stream result
self.checkIntervalResult()
self.prepareCountData()
self.createCountStream()
self.checkCountResult()

def prepareData(self):
tdLog.info(f"prepare data")

sqls = [
"alter dnode 1 'debugflag 135';",
"create snode on dnode 1;",
"create database db vgroups 4;",
"create table db.meters (ts timestamp, current int) tags(`groupid` int);"
]

ts = 1767196800000 # int(time.time())*1000
for t in range(0, 4):
sqls.append(f"create table db.d{t} using db.meters tags({t})")
for i in range(0, 12000000, 1000):
sqls.append(f"insert into db.d{t} values({ts+i},{i})")

tdSql.executes(sqls)
tdLog.info(f"create successfully.")

def createIntervalStream(self):
tdLog.info(f"create stream:")
sql = (
f"create stream db.meters_stream_interval_3 interval(3s) sliding(3s) from db.meters partition by tbname ,groupid stream_options (fill_history ('2026-01-01 00:00:00') |event_type (window_close) |force_output |pre_filter (tbname in ('d0','d1','d2'))) into db.stream_meters_interval output_subtable (concat('run_s_3#', tbname)) tags (point_name varchar(128) as '全椒2#窑运行状态', point_id varchar(128) as 'run_state_2#') as select _twend as ts, last(case when tbname = 'd0' then current end) as kilnrun, last(case when tbname = 'd1' then current end) as flowfeedback, last(case when tbname = 'd2' then current end) as fanrun, cast(case when last(case when tbname = 'd0' then current end) = 1.0 and last(case when tbname = 'd1' then current end) > 120.0 and last(case when tbname = 'd2' then current end) = 1.0 then 1 else 0 end as int) as run_state from db.meters where tbname in ('d0','d1','d2') and ts >= _twstart and ts < _twend;"
)

tdLog.info(f"create stream:{sql}")

try:
tdSql.execute(sql)
except Exception as e:
if "No stream available snode now" not in str(e):
raise Exception(f" user cant create stream no snode ,but create success")

while True:
tdSql.query(f"select status from information_schema.ins_streams")
if tdSql.getData(0,0) == "Running":
tdLog.info("Stream is running!")
break

tdLog.debug(f"current stream status: {tdSql.getData(0,0)}")
time.sleep(1)

def checkIntervalResult(self):
tdLog.info(f"checkIntervalResult start")

while True:
tdSql.query(f"select count(*) from db.`run_s_3#d0`")
if tdSql.getData(0,0) == 4000:
tdLog.info(f"get {tdSql.getData(0,0)} rows")
break

tdLog.debug(f"current row count: {tdSql.getData(0,0)}")
time.sleep(1)

tdSql.query(f"select * from db.`run_s_3#d0` order by ts limit 3065,10")
tdSql.checkData(7, 1, 9218000)

tdLog.info(f"check stream result successfully.")

def prepareCountData(self):
tdLog.info(f"prepare count data")

sqls = [
"alter dnode 1 'debugflag 135';",
"drop database db",
"create database db vgroups 4;",
"create table db.meters (ts timestamp, current int) tags(`groupid` int);"
]

ts = 1767196800000 # int(time.time())*1000
for t in range(0, 4):
sqls.append(f"create table db.d{t} using db.meters tags({t})")
for i in range(0, 4000000, 1000):
sqls.append(f"insert into db.d{t} values({ts+i},{i})")

tdSql.executes(sqls)
tdLog.info(f"create successfully.")

def createCountStream(self):
tdLog.info(f"create stream:")
sql = (
f"create stream db.meters_stream_count count_window(2,1) from db.meters partition by tbname ,groupid stream_options (fill_history ('2026-01-01 00:00:00') |low_latency_calc|watermark(10s)) into db.stream_meters_count output_subtable (concat('run_c_3#', tbname)) tags (groupid int as groupid) as select _twstart as ts, last(current) as now_batch, first(current) as previous_batch, _twstart as starttime, _twend as endtime from %%tbname where ts >= _twstart and ts<=_twend "
)

tdLog.info(f"create stream:{sql}")

try:
tdSql.execute(sql)
except Exception as e:
if "No stream available snode now" not in str(e):
raise Exception(f" user cant create stream no snode ,but create success")

while True:
tdSql.query(f"select status from information_schema.ins_streams")
if tdSql.getData(0,0) == "Running":
tdLog.info("Stream is running!")
break

tdLog.debug(f"current stream status: {tdSql.getData(0,0)}")
time.sleep(1)

def checkCountResult(self):
tdLog.info(f"checkIntervalResult start")

while True:
tdSql.query(f"select count(*) from db.`run_c_3#d0`")
if tdSql.getData(0,0) == 3999:
tdLog.info(f"get {tdSql.getData(0,0)} rows")
break

tdLog.debug(f"current row count: {tdSql.getData(0,0)}")
time.sleep(1)

tdSql.query(f"select * from db.`run_c_3#d0` order by ts limit 3065,10")
tdSql.checkData(7, 1, 3073000)

tdLog.info(f"check stream result successfully.")
1 change: 1 addition & 0 deletions test/ci/cases.task
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@
,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/02-Stream/test_stream_window_query.py
,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/02-Stream/test_stream_output_table.py
,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/02-Stream/stream_drop.py
,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/02-Stream/stream_fetch.py
## 03-TriggerMode
#,,n,.,pytest cases/18-StreamProcessing/03-TriggerMode/test_count_new.py
,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/03-TriggerMode/test_count.py
Expand Down
Loading