- 流式计算,算子有状态保留,支持流式计算
- 性能高,大多数情况下速度比polars高,内存消耗更少
- 算子丰富,内置丰富的金融算子,比如k线合成、回测、组合优化等等
- 可拓展性强,底层基于rust的
datafusion, 拓展到分布式很方便.
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple qust(只支持windows和linux)
量化框架的不可能三角:
-
高性能
-
易用
-
实盘回测一致
比如一些事件驱动的框架,优点是实盘回测一致,缺点是速度很慢, 而且不易用,毕竟操作 DataFrame 更加直观
还有一些向量化回测的框架, 优点是回测性能好,但是实盘回测不一致,而且从回测阶段转到实盘阶段比较麻烦
另外就是实盘和回测两套代码这种框架,这个不易用
总结下来就是,要想易用,就得用 DataFrame api 去做策略,要想实盘回测一致,就得用事件驱动
有没有方法能同时兼顾两者?有,用流式计算
底层用rust写就能实现高性能,api 封装成python的 DataFrame api 就能实现易用性,流式计算本身就是事件驱动,实盘回测就一致。qust的目的就是实现这个
import qust as qs
from qust import col
import polars as pl
import numpy as npn = 10
data = pl.DataFrame({
"factor": np.random.randn(n),
"code": np.random.choice(["a", "b", "c"], size=n, replace=True),
})
data_next = pl.DataFrame({
"factor": np.random.randn(n),
"code": np.random.choice(["a", "b", "c"], size=n, replace=True),
})
df = qs.with_cols(
col("factor").mean().expanding().alias("cum_mean"),
col("factor").mean().rolling(3).alias("rolling_mean"),
col("factor").mean().expanding().over("code").alias("cum_mean_over")
)print(df.calc_data(data))shape: (10, 5)
┌───────────┬──────┬───────────┬──────────────┬───────────────┐
│ factor ┆ code ┆ cum_mean ┆ rolling_mean ┆ cum_mean_over │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ f64 ┆ str ┆ f64 ┆ f64 ┆ f64 │
╞═══════════╪══════╪═══════════╪══════════════╪═══════════════╡
│ -0.071817 ┆ c ┆ -0.071817 ┆ null ┆ -0.071817 │
│ -0.039784 ┆ a ┆ -0.055801 ┆ null ┆ -0.039784 │
│ -1.593723 ┆ a ┆ -0.568442 ┆ -0.568442 ┆ -0.816754 │
│ -0.694003 ┆ b ┆ -0.599832 ┆ -0.775837 ┆ -0.694003 │
│ -0.54207 ┆ b ┆ -0.588279 ┆ -0.943265 ┆ -0.618036 │
│ -1.837696 ┆ c ┆ -0.796516 ┆ -1.02459 ┆ -0.954757 │
│ 0.111121 ┆ b ┆ -0.666853 ┆ -0.756215 ┆ -0.374984 │
│ 0.60889 ┆ c ┆ -0.507385 ┆ -0.372562 ┆ -0.433541 │
│ 0.029101 ┆ c ┆ -0.447776 ┆ 0.249704 ┆ -0.317881 │
│ 0.659319 ┆ b ┆ -0.337066 ┆ 0.432437 ┆ -0.116408 │
└───────────┴──────┴───────────┴──────────────┴───────────────┘
print(df.calc_data(data_next)) # df 里面的算子都状态保留shape: (10, 5)
┌───────────┬──────┬───────────┬──────────────┬───────────────┐
│ factor ┆ code ┆ cum_mean ┆ rolling_mean ┆ cum_mean_over │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ f64 ┆ str ┆ f64 ┆ f64 ┆ f64 │
╞═══════════╪══════╪═══════════╪══════════════╪═══════════════╡
│ 0.949717 ┆ b ┆ -0.220086 ┆ 0.546046 ┆ 0.096817 │
│ 0.866988 ┆ a ┆ -0.129496 ┆ 0.825341 ┆ -0.255506 │
│ -0.276218 ┆ a ┆ -0.140783 ┆ 0.513496 ┆ -0.260684 │
│ -0.433117 ┆ a ┆ -0.161664 ┆ 0.052551 ┆ -0.295171 │
│ 1.268484 ┆ a ┆ -0.066321 ┆ 0.186383 ┆ -0.034562 │
│ -1.898979 ┆ a ┆ -0.180862 ┆ -0.354537 ┆ -0.300907 │
│ -1.770744 ┆ a ┆ -0.274384 ┆ -0.800413 ┆ -0.484637 │
│ 1.31855 ┆ a ┆ -0.185888 ┆ -0.783724 ┆ -0.284283 │
│ -1.02516 ┆ b ┆ -0.23006 ┆ -0.492451 ┆ -0.090179 │
│ -0.175014 ┆ c ┆ -0.227308 ┆ 0.039459 ┆ -0.289307 │
└───────────┴──────┴───────────┴──────────────┴───────────────┘
data = pl.DataFrame({
"price": range(5),
"code": ["a", "a", "a", "b", "b"]
})
df = qs.with_cols(
col("price").sum().expanding().alias("cum_sum_otters"),
pl.col("price").cum_sum().alias("cum_sum_polars"),
col("price").sum().expanding().over("code").alias("cum_sum_otters_over"),
pl.col("price").cum_sum().over("code").alias("cum_sum_polars_over")
)
df.calc_data(data)| price | code | cum_sum_otters | cum_sum_polars | cum_sum_otters_over | cum_sum_polars_over |
|---|---|---|---|---|---|
| i64 | str | i64 | i64 | i64 | i64 |
| 0 | "a" | 0 | 0 | 0 | 0 |
| 1 | "a" | 1 | 1 | 1 | 1 |
| 2 | "a" | 3 | 3 | 3 | 3 |
| 3 | "b" | 6 | 6 | 3 | 3 |
| 4 | "b" | 10 | 10 | 7 | 7 |
import time
n = 2000000
data = pl.DataFrame({
"factor": np.random.randn(n),
"code": np.random.choice(["a", "b"], size=n, replace=True),
})s = time.time()
_ = qs.select(
col("factor").rank().rolling(10).over("code")
).calc_data(data)
print(f"qust: {(time.time() - s) * 1000.0}.ms")
s = time.time()
_ = data.select(
pl.col("factor").rolling_rank(10).over("code")
)
print(f"polars: {(time.time() - s) * 1000.0}.ms")qust: 105.74650764465332.ms
polars: 197.91436195373535.ms
s = time.time()
_ = qs.select(
col(*[col("factor").mean().alias(f"mean_{i}") for i in range(50)]).rolling(10).over("code")
).calc_data(data)
print(f"qust: {(time.time() - s) * 1000.0}.ms")
s = time.time()
_ = data.select(
[pl.col("factor").rolling_mean(10).over("code").alias(f"mean_{i}") for i in range(50)]
)
print(f"polars: {(time.time() - s) * 1000.0}.ms")qust: 141.4790153503418.ms
polars: 383.0065727233887.ms
class MeanUdf(qs.UdfRow):
def __init__(self):
self.sum = 0.0
self.count = 0.0
def output_schema(self, input_schema):
return [("mean_res", pl.Float64)]
def update(self, value):
self.sum += value
self.count += 1.0
def calc(self):
return [self.sum / self.count]
def retract(self, value):
self.sum -= value
self.count -= 1.0
s = time.time()
_ = qs.select(
col("factor").udf.row(MeanUdf()).rolling(10).over("code")
).calc_data(data)
print(f"qust: {(time.time() - s)}.s")
s = time.time()
_ = data.select(
pl.col("factor").rolling_map(lambda x: x.mean(), 10).over("code")
)
print(f"polars: {(time.time() - s)}.s")qust: 1.2886199951171875.s
polars: 52.34550166130066.s
| 算子 | qust | polars | 提速 |
|---|---|---|---|
| 单个算子 | 100ms | 157ms | 1.5倍 |
| 多个算子 | 110ms | 290ms | 2.5倍 |
| 自定义rolling算子 | 1.5s | 53s | 40倍 |
data = pl.DataFrame({
"value": [1, 2, 3, 4, 5]
})
data_next = pl.DataFrame({
"value": [3, 1, 10]
})qs.with_cols(
(pl.col("value") + 1).alias("value+1"),
(col("value").pl + 2).alias("value+2"),
col("value").mean().expanding().select(pl.col("value") - 1).alias("value-1")
).calc_data(data)| value | value+1 | value+2 | value-1 |
|---|---|---|---|
| i64 | i64 | i64 | f64 |
| 1 | 2 | 3 | 0.0 |
| 2 | 3 | 4 | 0.5 |
| 3 | 4 | 5 | 1.0 |
| 4 | 5 | 6 | 1.5 |
| 5 | 6 | 7 | 2.0 |
data.select(
col("value").mean().rolling(3).alias("value_mean1").pl,
col("value").pl.rolling_mean(3).alias("value_mean2"),
)| value | value_mean2 |
|---|---|
| f64 | f64 |
| null | null |
| null | null |
| 2.0 | 2.0 |
| 3.0 | 3.0 |
| 4.0 | 4.0 |
# 上面的写法没有状态保留, 如果需要状态保留,需要把算子的状态保存到全局变量,使用 `expr.cache(id)`
e = col(
col("value").mean().alias("mean"),
col("value").sum().alias("sum"),
).rolling(3)
e_pl = e.cache("unique_id").pl
# 注意这里不能接polars的over,e.cache("unique_id").pl.over("code"), 这种写法会直接报错,
# 可以写成 e.over("code").cache("unique_id").pl, 或者用 data.qs.df
data.select(e_pl)| value |
|---|
| struct[2] |
| {3.333333,10} |
| {2.666667,8} |
| {2.0,6} |
| {3.0,9} |
| {4.0,12} |
data_next.select(e_pl)| value |
|---|
| struct[2] |
| {4.0,12} |
| {3.0,9} |
| {4.666667,14} |
保存到全局的算子状态一直在内存里面,需要清除用:
qs.clear_cache("unique_id") # 单个清除
qs.clear_cache() # 全部清除由于polars的限制,上面的算子无法多列返回, 所以如果有多列返回,返回的是多列组成的struct
如果需要多列返回,只能这样写:
data.qs.select(e)| mean | sum |
|---|---|
| f64 | i64 |
| null | null |
| null | null |
| 2.0 | 6 |
| 3.0 | 9 |
| 4.0 | 12 |
df = qs.select(e)
data.qs.df(df)| mean | sum |
|---|---|
| f64 | i64 |
| null | null |
| null | null |
| 2.0 | 6 |
| 3.0 | 9 |
| 4.0 | 12 |
data_next.qs.df(df)| mean | sum |
|---|---|
| f64 | i64 |
| 4.0 | 12 |
| 3.0 | 9 |
| 4.666667 | 14 |
写量化策略的时候,一般有下面两种方法
-
向量化计算
-
事件驱动
如果策略用向量化计算,在实盘的时候就很慢,因为要重复计算历史数据, 而且很多策略没法向量化
如果策略用的事件驱动,回测的时候就很慢,而且事件驱动写法特别麻烦
流计算就是把算子都写成事件驱动的形式。比如计算移动平均,在算子里面存储两个状态 (sum, count), 每有一个行新数据value过来,更新算子的内部状态:
sum = sum + value
count = count + 1
在需要计算结果的时候就用 sum / count
data = pl.DataFrame({
"value": [1, 2, 3, 4, 5]
})
data_next = pl.DataFrame({
"value": [6, 7, 8]
})
df = qs.with_cols(
col("value").mean().rolling(3).alias("rolling_mean"),
col("value").std().expanding().alias("cum_std"),
)
print(df.calc_data(data))
shape: (5, 3)
┌───────┬──────────────┬──────────┐
│ value ┆ rolling_mean ┆ cum_std │
│ --- ┆ --- ┆ --- │
│ i64 ┆ f64 ┆ f64 │
╞═══════╪══════════════╪══════════╡
│ 1 ┆ null ┆ null │
│ 2 ┆ null ┆ 0.707107 │
│ 3 ┆ 2.0 ┆ 1.0 │
│ 4 ┆ 3.0 ┆ 1.290994 │
│ 5 ┆ 4.0 ┆ 1.581139 │
└───────┴──────────────┴──────────┘
print(df.calc_data(data_next))
shape: (3, 3)
┌───────┬──────────────┬──────────┐
│ value ┆ rolling_mean ┆ cum_std │
│ --- ┆ --- ┆ --- │
│ i64 ┆ f64 ┆ f64 │
╞═══════╪══════════════╪══════════╡
│ 6 ┆ 5.0 ┆ 1.870829 │
│ 7 ┆ 6.0 ┆ 2.160247 │
│ 8 ┆ 7.0 ┆ 2.44949 │
└───────┴──────────────┴──────────┘在第一个调用df.calc_data(data)的时候,df内部的算子都有状态保留,所以在第二个调用df.calc_data(data_next)时候,没有重新计算
实际情况是,绝大多数算子都有对应的事件驱动形式,少量的算子比如pl.col("a").rank(), 看起来不是事件驱动的形式(当前行的值受到未来行的值的影响),但是其实也可以变换成事件驱动形式,
-
转换成行算子,比如 a 列有a1,a2,a3三个元素,就是
col(a1, a2, a3).rank(axis=1) -
事件驱动形式的批算子,每次计算的时候保证传入的数据完整,比如计算
pl.col("a").rank().over("date"), 保证每次计算传入的数据包含整天的所有数据
polars不是也支持streaming吗?我看了polars的底层,觉得polars的streaming不是真正意义上的流式计算,只是为了避免out of memory,而且局限性大(比如over是用的 切割 -> 计算 -> 拼接)。如果polars要实现真正的流式计算,我估计底层得推倒重来改成datafusion的那种框架
polars的Expr用的enum, 这样就导致每实现一个算子,底层很多代码都要改, 这样就不难理解为什么一个简单的pl.col("a").rolling_rank(10)算子直到最近才实现,而且速度比我一个简单的实现慢一倍。
datafusion聚合算子用的Box<dyn trait>, 然后根据上下文选择不同路径的ExecutionPlan, 这样添加算子很方便,而且优化路径也很清晰,性能还不受影响。
polars这种写法还有个缺点,就是导致同样的逻辑写法割裂,比如求和逻辑有下面写法:
-
pl.col("a").sum() -
pl.col("a").cum_sum() -
pl.col("a").rolling_sum(10) -
df.group_by("b").agg([pl.col("a").sum()])
如果说 sum() 和 rolling_sum(10), 都是求和逻辑, 前一个是针对整列,后一个是针对滚动,但是 rank()和rolling_rank(10), 又是两个不想关的算子, 而且并不存在cum_rank()这个算子,这样逻辑就很割裂,为什么能存在cum_sum, 但是不能存在cum_rank, cum_skew, cum_cov?
相反用datafusion的上下文逻辑,写法就比较一致:
-
col("a").sum() -
col("a").sum().expanding() -
col("a").sum().rolling(10) -
col("a").sum().group_by("b")
polars 和 datafusion 对单个算子都不支持多列返回,但是datafusion提供了插件接口,能改成多列返回:
n = 7
data = pl.DataFrame({
"y": np.random.randn(n),
"x1": np.random.randn(n),
"x2": np.random.randn(n),
})
res = qs.with_cols(
col("y", "x1", "x2").stock.ols().rolling(4).add_suffix("rolling_beta"),
).calc_data(data)
print(res)
shape: (7, 5)
┌───────────┬───────────┬───────────┬─────────────────┬─────────────────┐
│ y ┆ x1 ┆ x2 ┆ x1_rolling_beta ┆ x2_rolling_beta │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ f64 ┆ f64 ┆ f64 ┆ f64 ┆ f64 │
╞═══════════╪═══════════╪═══════════╪═════════════════╪═════════════════╡
│ 0.522261 ┆ -0.376497 ┆ -0.594123 ┆ null ┆ null │
│ 1.325991 ┆ -0.723979 ┆ 2.626444 ┆ null ┆ null │
│ 1.502309 ┆ -2.089571 ┆ 0.28167 ┆ null ┆ null │
│ -0.322316 ┆ 0.00877 ┆ -0.213895 ┆ -0.731707 ┆ 0.271784 │
│ -0.733964 ┆ -0.750248 ┆ -0.592936 ┆ -0.47639 ┆ 0.465733 │
│ 0.445435 ┆ -0.559213 ┆ -0.44069 ┆ -0.56446 ┆ 1.174467 │
│ 1.735427 ┆ -2.403888 ┆ 1.207053 ┆ -0.29973 ┆ 0.849167 │
└───────────┴───────────┴───────────┴─────────────────┴─────────────────┘多列返回我能想到以下好处
-
多列返回在用一些比如k线合成算子,策略信号算子之类的比较方便
-
另一个是避免用
struct, 如果底层依赖从arror-rs改成MinArrow, 估计内存占用能到原来的一半,并且耗时减少
-
支持
DataFrameApi 和 sql相互转换,polars不行 -
原生支持
arrow,datafusion是arrow的一部分,未来生态会更丰富,polars自己写了一个polars-arrow, 生态割裂 -
datafusion有成熟的分布式应用,而且全部开源,polars前期是基于datafusion的二次开发,目前分布式刚起步,而且闭源,貌似已经把主要精力放在商业闭源上面去了
qust是用rust写的一个datafusion插件,主要目的是尝试用DataFrame api去写事件驱动量化策略,并且保持向量化计算的高性能.
所以主要是添加一些能够状态保留的算子,其他一些无需状态保留的算子,还是依赖于polars的算子,比如:
col("a") + 1会报错:
TypeError: unsupported operand type(s) for +: 'Expr' and 'int'
只能用polars的算子:
qs.select(
pl.col("a") + 1,
pl.col("a").rank().over("code")
col("a").select(pl.col("a") + 1).over("code")
)当然,上面说的只是我个人的理解,对这方面有兴趣的朋友可以加我微信交流,微信号: aruster
# 从github读取tick数据
data_kline = pl.read_parquet("https://github.com/baiguoname/qust/blob/main/examples/data/300_1min_vnpy.parquet?raw=true") # 从github读取数据,速度较慢
# 假设历史数据
data_his = data_kline[:600000]
# 假设实盘数据流
data_live = [data_kline[600000:601000], data_kline[601000:602000]]
# 从github读取kine数据
data_tick = pl.read_parquet("https://github.com/baiguoname/qust/blob/main/examples/data/data_tick.parquet?raw=true")# 策略逻辑
stra = (
col(
col("close"),
col("datetime"),
col("close").stra.two_ma(10, 20), # 通过算子生成信号
)
.with_cols(col("cross_up", "cross_down").stra.to_hold_always().alias("hold")) # 通过信号生成目标持仓
)
# 回测
df_bt = qs.select(
stra.with_cols(
col("close", "hold").bt.price()
).expanding().select(
col("datetime", "pnl").fp.group_pnl()
)
)
# 实盘
df_live = qs.select(stra.expanding().select("hold").last_value())%%time
# 回测
df_bt.calc_data(data_his)CPU times: user 41.6 ms, sys: 33.7 ms, total: 75.3 ms
Wall time: 55.2 ms
| date | pnl | pnl_cum |
|---|---|---|
| date | f64 | f64 |
| 2009-01-05 | 22.14 | 22.14 |
| 2009-01-06 | -2.74 | 19.4 |
| 2009-01-07 | 20.98 | 40.38 |
| 2009-01-08 | 39.31 | 79.69 |
| 2009-01-09 | -2.92 | 76.77 |
| … | … | … |
| 2019-04-11 | 28.23 | -2400.2 |
| 2019-04-12 | -32.8 | -2433.0 |
| 2019-04-15 | 26.66 | -2406.34 |
| 2019-04-16 | 16.95 | -2389.39 |
| 2019-04-17 | -90.43 | -2479.82 |
# 实盘
import time
df_live.calc_data(data_his)
for data_live_ in data_live: # 模拟实盘数据流, 实际中应该用异步
print(f"----接收到实盘数据, 实时数据长度: {data_live_.shape[0]},开始一轮计算---")
s = time.time()
print(df_live.calc_data(data_live_))
print(f"计算完成, 耗时: {time.time() - s}")
print("------------------")
# 可以看到虽然历史数据需要几十万,但是每次实盘计算的时间很短,因为是流式计算----接收到实盘数据, 实时数据长度: 1000,开始一轮计算---
shape: (1, 1)
┌──────┐
│ hold │
│ --- │
│ f64 │
╞══════╡
│ 1.0 │
└──────┘
计算完成, 耗时: 0.002620220184326172
------------------
----接收到实盘数据, 实时数据长度: 1000,开始一轮计算---
shape: (1, 1)
┌──────┐
│ hold │
│ --- │
│ f64 │
╞══════╡
│ 1.0 │
└──────┘
计算完成, 耗时: 0.0023484230041503906
------------------
# 策略逻辑
col_tick = col("t", "c", "v", "bid1", "ask1", "bid1_v", "ask1_v")
stra = (
col(
col("c"),
col("t"),
col_tick.kline.future_ra1m.with_cols(
col("close").stra.two_ma(10, 20).filter_cb("is_finished")
),
).with_cols(
col(
col("cross_up", "c").stra.exit_by_pct(0.01, False).alias("take_profit_long"),
col("cross_up", "c").stra.exit_by_pct(0.01, True).alias("stop_loss_long"),
)
.with_cols(
(pl.col("take_profit_long") | pl.col("stop_loss_long")).alias("exit_long_sig")
),
col(
col("cross_down", "c").stra.exit_by_pct(0.01, True).alias("take_profit_short"),
col("cross_down", "c").stra.exit_by_pct(0.01, False).alias("stop_loss_short"),
)
.with_cols(
(pl.col("take_profit_short") | pl.col("stop_loss_short")).alias("exit_short_sig")
)
).with_cols(
col("cross_up", "exit_long_sig", "cross_down", "exit_short_sig")
.stra
.to_hold_two_sides()
.alias("hold")
)
)
# 价格回测
df_bt_price = (qs
.select(
stra
.with_cols(col("c", "hold").bt.price())
.expanding()
.over("ticker")
.select(col("t", "pnl").fp.group_pnl())
)
)
# tick回测
df_bt_tick = (qs.select(
col(
"bid1",
"ask1",
stra,
)
.with_cols(
col("hold", "c", "bid1", "ask1")
.bt
.tick(qs.TradePriceType.queue, qs.MatchPriceType.simnow)
# .tick(qs.TradePriceType.last_price, qs.MatchPriceType.void)
)
.expanding()
.over("ticker")
.select(col("t", "pnl").fp.group_pnl())
)
)df_bt_price.calc_data(data_tick)df_bt_tick.calc_data(data_tick)# 策略逻辑
col_tick = col("t", "c", "v", "bid1", "ask1", "bid1_v", "ask1_v")
stra = (
col(
col("c"),
col("t"),
col_tick.kline.rl5m
.with_cols(
col("close").stra.two_ma(10, 20).filter_cb("is_finished")
)
.add_suffix("m5"),
col_tick.kline.rl30m
.with_cols(
col("close").stra.two_ma(10, 20).filter_cb("is_finished")
)
.add_suffix("m30")
)
.with_cols(
col("cross_up_m30", "cross_down_m30").ffill()
)
.with_cols(
col(pl.col("cross_up_m5") & pl.col("cross_up_m30")).alias("open_long_sig"),
col(pl.col("cross_down_m5") & pl.col("cross_down_m30")).alias("open_short_sig"),
)
.with_cols(
col(
col("open_long_sig", "c").stra.exit_by_pct(0.05, False).alias("take_profit_long"),
col("open_long_sig", "c").stra.exit_by_pct(0.02, True).alias("stop_loss_long"),
)
.select(
(pl.col("take_profit_long") | pl.col("stop_loss_long")).alias("exit_long_sig")
),
col(
col("open_short_sig", "c").stra.exit_by_pct(0.05, True).alias("take_profit_short"),
col("open_short_sig", "c").stra.exit_by_pct(0.02, False).alias("stop_loss_short"),
)
.select(
(pl.col("take_profit_short") | pl.col("stop_loss_short")).alias("exit_short_sig")
)
)
.with_cols(
col("open_long_sig", "exit_long_sig", "open_short_sig", "exit_short_sig")
.stra
.to_hold_two_sides()
.alias("hold")
)
)
# tick回测逻辑
df_bt_tick = (
qs.select(
col(
"bid1",
"ask1",
stra,
)
.with_cols(
col("hold", "c", "bid1", "ask1")
.bt
.tick(qs.TradePriceType.queue, qs.MatchPriceType.simnow)
# .backtest_tick(qs.TradePriceType.last_price, qs.MatchPriceType.void)
)
.expanding()
.over("ticker")
.select(
col("t", "pnl").fp.group_pnl()
)
)
)df_bt_tick.calc_data(data_tick)(qs
.with_cols(
col("high", "low", "close").stra.c66()
)
.select(pl.all().qs.fp.bt(fee_rate = 0.0000)) # 只是做示例用,没有手续费
.calc_data(data_kline)
.qs
._line()
)(qs
.with_cols(
col("t", "c", "v", "bid1", "ask1", "bid1_v", "ask1_v")
.kline
.rl1m
.expanding()
)
.with_cols(
col("high", "low", "close")
.stra
.c66()
.filter_cb("is_finished")
)
.with_cols(
col("c", "v", "bid1", "ask1")
.ta
.order_flow_gap
.rolling(2000)
.with_cols(
(pl.col("of_gap") > 100).alias("open_long_sig_of"),
(pl.col("of_gap") < -100).alias("open_short_sig_of"),
)
)
.with_cols(
(pl.col("open_long_sig") & pl.col("open_long_sig_of")).alias("open_long_sig"),
(pl.col("open_short_sig") & pl.col("open_short_sig_of")).alias("open_short_sig"),
)
.select(pl.all().qs.fp.bt(-1.0, 1.0))
.calc_data(data_tick)
)qust底层用的rust,性能有保障,但是不能也没有必要覆盖所有的情况,所以自定义算子很重要。
实现一个自定义的算子之后,这个算子就能像内置算子那样在各种上下文计算,比如rolling, group_by, over之类
虽然目前自定义的行算子比polars要高出很多倍(见上面的测试),但是毕竟比rust慢,所以最好是在策略的最后阶段比如仓位管理之类的时候去自定义行算子
# 例子1,自定义一个均线计算
class MeanUdf(qs.UdfRow):
def __init__(self):
# 在内部做状态保留
self.sum = 0.0
self.count = 0.0
def output_schema(self, input_schema):
return [("mean_res", pl.Float64)]
# 更新数据
# value来自于输入的每一行
# col("a") => update(self, a_value)
# col("a", "b") => update(self, a_value, b_value)
def update(self, value):
self.sum += value
self.count += 1.0
# 计算结果,必须要返回一个list
def calc(self):
return [self.sum / self.count]
# 如果需要支持rolling,必须要实现这个方法,说明怎么滚动
def retract(self, value):
self.sum -= value
self.count -= 1.0
import numpy as np
n = 1000
data_test = pl.DataFrame({
"value": np.random.randn(n),
"code": np.random.choice(["a", "b", "c"], size=n, replace=True),
"window": np.random.choice([10, 5, 2], size = n, replace=True),
"intra_day": np.random.choice([True, False], size = n, replace=True)
})
e = col("value").udf.row(MeanUdf())
# 自定义的行算子可以在各类上下文中使用
qs.with_cols(
e.expanding().alias("expanding"),
e.rolling(10).alias("rolling"),
e.rolling_dynamic("window").alias("rolling_dynamic"),
e.rolling_intra_day("intra_day", 3).alias("rolling_intraday"),
e.expanding().alias("expanding").over("code").add_suffix("over"),
e.rolling(10).alias("rolling").over("code").add_suffix("over"),
e.rolling_dynamic("window").alias("rolling_dynamic").add_suffix("over"),
e.rolling_intra_day("intra_day", 3).alias("rolling_intraday").add_suffix("over"),
).calc_data(data_test)| value | code | window | intra_day | expanding | rolling | rolling_dynamic | rolling_intraday | expanding_over | rolling_over | rolling_dynamic_over | rolling_intraday_over |
|---|---|---|---|---|---|---|---|---|---|---|---|
| f64 | str | i64 | bool | f64 | f64 | f64 | f64 | f64 | f64 | f64 | f64 |
| 1.473805 | "c" | 10 | false | 1.473805 | null | 1.473805 | null | 1.473805 | null | 1.473805 | null |
| 0.901422 | "a" | 2 | false | 1.187613 | null | 1.187613 | null | 0.901422 | null | 1.187613 | null |
| -0.169569 | "c" | 10 | false | 0.735219 | null | 0.735219 | null | 0.652118 | null | 0.735219 | null |
| 1.372656 | "a" | 10 | true | 0.894578 | null | 0.894578 | null | 1.137039 | null | 0.894578 | null |
| 0.526456 | "c" | 10 | false | 0.820954 | null | 0.820954 | null | 0.610231 | null | 0.820954 | null |
| … | … | … | … | … | … | … | … | … | … | … | … |
| -1.635003 | "a" | 10 | false | -0.056636 | -0.383553 | -0.383553 | -0.544177 | -0.128022 | 0.021579 | -0.383553 | -0.544177 |
| -0.165867 | "c" | 2 | false | -0.056745 | -0.443752 | -0.900435 | -0.054465 | -0.026123 | -0.426566 | -0.900435 | -0.054465 |
| 0.563975 | "b" | 10 | true | -0.056124 | -0.372388 | -0.372388 | 0.188816 | -0.017073 | -0.046329 | -0.372388 | 0.188816 |
| 0.013646 | "a" | 5 | true | -0.056054 | -0.30627 | -0.612689 | -0.420858 | -0.127587 | 0.047363 | -0.612689 | -0.420858 |
| 0.779019 | "a" | 5 | true | -0.055219 | -0.166259 | -0.088846 | 0.452213 | -0.124815 | 0.071643 | -0.088846 | 0.452213 |
# 例子2,利用自定义行算子实现一个马丁策略
class MartinGillStra(qs.UdfRow):
# 用python自定义行算子,实现一个马丁格尔策略
# 策略的输入
# -----
# col(price, line_down_std2, line_down_std1, line_middle, line_up_std1, line_up_std2)
# price: k线价格
# line_down_std2: k线形成的最低线
# line_down_std1: k线形成的中下线
# line_middle: k线的中间线
# line_up_std1: k线形成的中上线
# line_up_std2: k线形成的最高线
# 策略逻辑
# ------
# price 处于 [line_middle, line_up_std1], 目标仓位1,
# price 处于 [line_middle, line_up_std2], 目标仓位2,
# price 处于 [line_up_std2, inf], 目标仓位3,
# 反过来就是 -1, -2, -3
# 输出
# ------
# col(target)
# target: 目标仓位
def __init__(self):
self.last_hold = 0.0
def output_schema(self, input_schema):
return [("hold", pl.Float64)]
def update(self, price, down_std2, down_std1, middle, up_std1, up_std2):
# 如果能够保证输入没有null,可以去除这个检查,性能有提升
if price is None or down_std2 is None or down_std1 is None or middle is None or up_std1 is None or up_std2 is None:
return None
if price <= down_std2:
self.last_hold = -3.0
elif down_std2 < price <= down_std1:
self.last_hold = -2.0
elif down_std1 < price <= middle:
self.last_hold = -1.0
elif middle < price < up_std1:
self.last_hold = 1.0
elif up_std1 <= price < up_std2:
self.last_hold = 2.0
elif up_std2 <= price:
self.last_hold = 3.0
def calc(self):
return [self.last_hold]
(qs
.with_cols(
col(
col("close").alias("price"),
col("close").mean().alias("middle"),
col("close").std().alias("std"),
)
.rolling(20, 1)
.select(
col(
"price",
(pl.col("middle") - 2.0 * pl.col("std")).alias("a"),
(pl.col("middle") - 1.0 * pl.col("std")).alias("b"),
"middle",
(pl.col("middle") + 1.0 * pl.col("std")).alias("c"),
(pl.col("middle") + 2.0 * pl.col("std")).alias("d"),
)
.udf
.row(MartinGillStra())
)
.expanding()
)
.with_cols(
col("close", "hold")
.bt
.price(fee_rate=0.0002)
.expanding()
)
.select(
col("datetime", "pnl").fp.group_pnl()
)
.calc_data(data_kline)
)