【Python】WonderTrader高频交易初探及v0.6发布

WonderTrader高频交易初探及v0.6发布

WonderTrader发布于 20 分钟前

【Python】WonderTrader高频交易初探及v0.6发布

前言

自从WonderTrader实现了HFT策略引擎以来,一直都没有时间彻底的将高频策略研发、回测、仿真、实盘整个流程彻底走通一遍。所以趁着最近公司要上高频的机会,笔者基于WonderTrader把高频策略的应用彻底梳理了一遍。

本文的主要目的就是帮助用户初步了解WonderTraderHFT引擎上如何开发策略的。

平台准备

之前实盘框架下的HFT引擎已经基本完成,但是回测框架下的HFT策略的支持因为事情太多一直没有完善。这次彻底梳理HFT引擎,正好把回测部分也完善了一下。HFT回测引擎完善之后,WonderTrader也正好发布一个新版本v0.6.0。

高频模型介绍

本文采用的高频模型,源自Darryl ShenLinacre College University of Oxford)于2015年5月27日发表的《Order Imbalance Based Strategy in High Frequency Trading》一文(网络上可以找到)。

该模型基于Ltick数据中的委托量的不平衡因子、委比因子以及中间价回归因子三个因子,预测t0时刻之后的ktick数据的中间价的均价变化量,并以此构建线性模型。通过线性回归,得到各个因子的系数。线性方程如下:
【Python】WonderTrader高频交易初探及v0.6发布

方程中各个符号的具体含义,请感兴趣的读者自行检索。该文中使用2014年IF主力合约全年的tick进行回测,每次进出场以1手股指为单位,可以实现92.6% 的胜率,最优参数下,年化夏普率可以达到7.243,日均P&L在58600元。

模型实现

有了模型以后,我们开始来编写代码实现。因为本文旨在介绍HFT策略开发的流程,为了降低读者理解难度,策略都采用Python编写。

策略框架介绍

首先我们来看一下一个高频策略的基本结构:

class BaseHftStrategy:

'''

HFT策略基础类,所有的策略都从该类派生\n

包含了策略的基本开发框架

'''

def __init__(self, name):

self.__name__ = name

def name(self):

return self.__name__

def on_init(self, context:HftContext):

'''

策略初始化,启动的时候调用\n

用于加载自定义数据\n

@context 策略运行上下文

'''

return

def on_tick(self, context:HftContext, stdCode:str, newTick:dict):

'''

Tick数据进来时调用\n

@context 策略运行上下文\n

@stdCode 合约代码\n

@newTick 最新Tick

'''

return

def on_order_detail(self, context:HftContext, stdCode:str, newOrdQue:dict):

'''

逐笔委托数据进来时调用\n

@context 策略运行上下文\n

@stdCode 合约代码\n

@newOrdQue 最新逐笔委托

'''

return

def on_order_queue(self, context:HftContext, stdCode:str, newOrdQue:dict):

'''

委托队列数据进来时调用\n

@context 策略运行上下文\n

@stdCode 合约代码\n

@newOrdQue 最新委托队列

'''

return

def on_transaction(self, context:HftContext, stdCode:str, newTrans:dict):

'''

逐笔成交数据进来时调用\n

@context 策略运行上下文\n

@stdCode 合约代码\n

@newTrans 最新逐笔成交

'''

return

def on_bar(self, context:HftContext, stdCode:str, period:str, newBar:dict):

'''

K线闭合时回调

@context 策略上下文\n

@stdCode 合约代码

@period K线周期

@newBar 最新闭合的K线

'''

return

def on_channel_ready(self, context:HftContext):

'''

交易通道就绪通知\n

@context 策略上下文\n

'''

return

def on_channel_lost(self, context:HftContext):

'''

交易通道丢失通知\n

@context 策略上下文\n

'''

return

def on_entrust(self, context:HftContext, localid:int, stdCode:str, bSucc:bool, msg:str, userTag:str):

'''

下单结果回报

@context 策略上下文\n

@localid 本地订单id\n

@stdCode 合约代码\n

@bSucc 下单结果\n

@mes 下单结果描述

'''

return

def on_order(self, context:HftContext, localid:int, stdCode:str, isBuy:bool, totalQty:float, leftQty:float, price:float, isCanceled:bool, userTag:str):

'''

订单回报

@context 策略上下文\n

@localid 本地订单id\n

@stdCode 合约代码\n

@isBuy 是否买入\n

@totalQty 下单数量\n

@leftQty 剩余数量\n

@price 下单价格\n

@isCanceled 是否已撤单

'''

return

def on_trade(self, context:HftContext, localid:int, stdCode:str, isBuy:bool, qty:float, price:float, userTag:str):

'''

成交回报

@context 策略上下文\n

@stdCode 合约代码\n

@isBuy 是否买入\n

@qty 成交数量\n

@price 成交价格

'''

return

整个策略的结构大致可以分为四块:

  • 策略本身的回调
  • 行情数据的回调
  • 交易通道的回调
  • 交易回报的回调

其中行情数据的回调,主要包括on_tickon_barlevel2数据回调,本文中只需要关注on_tick即可;交易通道的回调,主要是通知策略交易通道的连接和断开事件;交易回报的回调,主要是订单回报、成交回报以及下单回报。

参数设计

根据模型的逻辑,我们设置回溯tick数为5,中间价变动的阈值为0.3,那么我们便可以将策略参数设计如下:

'''交易参数'''

self.__code__ = code #交易合约

self.__expsecs__ = expsecs #订单超时秒数,用于控制超时撤单

self.__freq__ = freq #交易频率控制,指定时间内限制信号数,单位秒

self.__lots__ = lots #单次交易手数

self.count = count #回溯tick条数

self.beta_0 = beta_0 #常量系数+残差

self.beta_r = beta_r #中间价回归因子系数

self.threshold = threshold #中间价变动阈值

self.beta_oi = beta_oi #成交量不平衡因子系数序列

self.beta_rou = beta_rou #委比因子系数序列

self.active_secs = active_secs #交易时间区间

self.stoppl = stoppl #止盈止损配置

核心逻辑

在大致了解了HFT策略的结构以后,我们就可以开始来编码了。整个策略的核心逻辑,集中在on_tick回调中,主要就是上述模型的计算,代码如下:

hisTicks = context.stra_get_ticks(self.__code__, self.count + 1)

if hisTicks.size != self.count+1:

return

if (len(newTick["askprice"]) == 0) or (len(newTick["bidprice"]) == 0):

return

spread = newTick["askprice"][0] - newTick["bidprice"][0]

total_OIR = 0.0

total_rou = 0.0

# 计算不平衡因子和委比因子的累加之和

for i in range(1, self.count + 1):

prevTick = hisTicks.get_tick(i-1)

curTick = hisTicks.get_tick(i)

lastBidPx = self.get_price(prevTick, -1)

lastAskPx = self.get_price(prevTick, 1)

lastBidQty = prevTick["bidqty"][0] if len(prevTick["bidqty"]) > 0 else 0

lastAskQty = prevTick["askqty"][0] if len(prevTick["askqty"]) > 0 else 0

curBidPx = self.get_price(curTick, -1)

curAskPx = self.get_price(curTick, 1)

curBidQty = curTick["bidqty"][0] if len(curTick["bidqty"]) > 0 else 0

curAskQty = curTick["askqty"][0] if len(curTick["askqty"]) > 0 else 0

delta_vb = 0.0

delta_va = 0.0

if curBidPx < lastBidPx:

delta_vb = 0.0

elif curBidPx == lastBidPx:

delta_vb = curBidQty - lastBidQty

else:

delta_vb = curBidQty

if curAskPx < lastAskPx:

delta_va = curAskQty

elif curAskPx == lastAskPx:

delta_va = curAskQty - lastAskQty

else:

delta_va = 0.0

voi = delta_vb - delta_va

total_OIR += self.beta_oi[i-1]*voi/spread

#计算委比

rou = (curBidQty - curAskQty)/(curBidQty + curAskQty)

total_rou += self.beta_rou[i-1]*rou/spread

prevTick = hisTicks.get_tick(-2)

# t-1时刻的中间价

prevMP = (self.get_price(prevTick, -1) + self.get_price(prevTick, 1))/2

# 最新的中间价

curMP = (newTick["askprice"][0] + newTick["bidprice"][0])/2

# 两个快照之间的成交均价

if newTick["volumn"] != 0:

avgTrdPx = newTick["turn_over"]/newTick["volumn"]/self.__comm_info__.volscale

elif self._last_atp__!= 0:

avgTrdPx = self._last_atp__

else:

avgTrdPx = curMP

self._last_atp__ = avgTrdPx

# 计算中间价回归因子

curR = avgTrdPx - (prevMP + curMP) / 2

# 计算预期中间价变化量

efpc = self.beta_0 + total_OIR + total_rou + self.beta_r * curR / spread

if efpc >= self.threshold:

targetPos = self.__lots__

diffPos = targetPos - curPos

if diffPos != 0.0:

targetPx = newTick["askprice"][0]

ids = context.stra_buy(self.__code__, targetPx, abs(diffPos), "enterlong")

#将订单号加入到管理中

for localid in ids:

self.__orders__[localid] = localid

self.__last_entry_time__ = now

self._max_dyn_prof = 0

self._max_dyn_loss = 0

elif efpc <= -self.threshold:

targetPos = -self.__lots__

diffPos = targetPos - curPos

if diffPos != 0:

targetPx = newTick["bidprice"][0]

ids = context.stra_sell(self.__code__, targetPx, abs(diffPos), "entershort")

#将订单号加入到管理中

for localid in ids:

self.__orders__[localid] = localid

self.__last_entry_time__ = now

self._max_dyn_prof = 0.0

self._max_dyn_loss = 0.0

止盈止损逻辑

但是对于高频策略,除了核心的进出场逻辑之外,止盈止损逻辑也是非常重要的一部分。本文中的策略采用固定点位止损+跟踪止盈来作为止盈止损逻辑,代码如下:

# 止盈止损逻辑

if curPos != 0 and self.stoppl["active"]:

isLong = (curPos > 0)

# 首先获取最新的价格,calc_price为0的话,使用对手价计算浮盈,calc_price为1的话,使用最新价计算浮盈

price = 0

if self.stoppl["calc_price"] == 0:

price = self.get_price(newTick, -1) if isLong else self.get_price(newTick, 1)

else:

price = newTick["price"]

#然后计算浮动盈亏的跳数

diffTicks = (price - self._last_entry_price)*(1 if isLong else -1) / self.__comm_info__.pricetick

if diffTicks > 0:

self._max_dyn_prof = max(self._max_dyn_prof, diffTicks)

else:

self._max_dyn_loss = min(self._max_dyn_loss, diffTicks)

bNeedExit = False

usertag = ''

stop_ticks = self.stoppl["stop_ticks"]

track_threshold = self.stoppl["track_threshold"]

fallback_boundary = self.stoppl["fallback_boundary"]

if diffTicks <= stop_ticks:

context.stra_log_text("浮亏%.0f超过%d跳,止损离场" % (diffTicks, stop_ticks))

bNeedExit = True

usertag = "stoploss"

elif self._max_dyn_prof >= track_threshold and diffTicks <= fallback_boundary:

context.stra_log_text("浮赢回撤%.0f->%.0f[阈值%.0f->%.0f],止盈离场" % (self._max_dyn_prof, diffTicks, track_threshold, fallback_boundary))

bNeedExit = True

usertag = "stopprof"

if bNeedExit:

targetprice = self.get_price(newTick, -1) if isLong else self.get_price(newTick, 1)

ids = context.stra_sell(self.__code__, targetprice, abs(curPos), usertag) if isLong else context.stra_buy(self.__code__, price, abs(curPos), usertag)

for localid in ids:

self.__orders__[localid] = localid

# 出场逻辑执行以后结束逻辑

return

收盘前出场的逻辑

有了止盈止损逻辑,我们还需要添加一段收盘前出场的逻辑,代码如下:

curMin = context.stra_get_time()

curPos = context.stra_get_position(stdCode)

# 不在交易时间,则检查是否有持仓

# 如果有持仓,则需要清理

if not self.is_active(curMin):

self._last_atp__ = 0.0

if curPos == 0:

return

self.__to_clear__ = True

else:

self.__to_clear__ = False

# 如果需要清理持仓,且不在撤单过程中

if self.__to_clear__ :

if self.__cancel_cnt__ == 0:

if curPos > 0:

# 以对手价挂单

targetPx = self.get_price(newTick, -1)

ids = context.stra_sell(self.__code__, targetPx, abs(curPos), "deadline")

#将订单号加入到管理中

for localid in ids:

self.__orders__[localid] = localid

elif curPos < 0:

# 以对手价挂单

targetPx = self.get_price(newTick, 1)

ids = context.stra_buy(self.__code__, targetPx, abs(curPos), "deadline")

#将订单号加入到管理中

for localid in ids:

self.__orders__[localid] = localid

return

订单管理逻辑

然后,我们还需要添加一段订单管理的逻辑,代码如下:

def check_orders(self, ctx:HftContext):

#如果未完成订单不为空

ord_cnt = len(self.__orders__.keys())

if ord_cnt > 0 and self.__last_entry_time__ is not None:

#当前时间,一定要从api获取,不然回测会有问题

now = makeTime(ctx.stra_get_date(), ctx.stra_get_time(), ctx.stra_get_secs())

span = now - self.__last_entry_time__

total_secs = span.total_seconds()

if total_secs >= self.__expsecs__: #如果订单超时,则需要撤单

ctx.stra_log_text("%d条订单超时撤单" % (ord_cnt))

for localid in self.__orders__:

ctx.stra_cancel(localid)

self.__cancel_cnt__ += 1

ctx.stra_log_text("在途撤单数 -> %d" % (self.__cancel_cnt__))

其他逻辑

除了上述的逻辑之外,我们还需要处理一些细节问题,如:

  • 处理订单回报,用于更新本地订单的状态;
  • 处理成交回报,用于更新入场价格,计算浮动盈亏
  • 处理交易通道就绪的回报,用于检查是否有不在管理内的未完成单

完整源码

整个策略的完整代码如下:

from wtpy import BaseHftStrategy

from wtpy import HftContext

from datetime import datetime

def makeTime(date:int, time:int, secs:int):

'''

将系统时间转成datetime\n

@date 日期,格式如20200723\n

@time 时间,精确到分,格式如0935\n

@secs 秒数,精确到毫秒,格式如37500

'''

return datetime(year=int(date/10000), month=int(date%10000/100), day=date%100,

hour=int(time/100), minute=time%100, second=int(secs/1000), microsecond=secs%1000*1000)

class HftStraOrderImbalance(BaseHftStrategy):

def __init__(self, name:str, code:str, count:int, lots:int, beta_0:float, beta_r:float, threshold:float,

beta_oi:list, beta_rou:list, expsecs:int, offset:int, freq:int, active_secs:list, stoppl:dict, reserve:int=0):

BaseHftStrategy.__init__(self, name)

'''交易参数'''

self.__code__ = code #交易合约

self.__expsecs__ = expsecs #订单超时秒数,用于控制超时撤单

self.__freq__ = freq #交易频率控制,指定时间内限制信号数,单位秒

self.__lots__ = lots #单次交易手数

self.count = count #回溯tick条数

self.beta_0 = beta_0 #常量系数+残差

self.beta_r = beta_r #中间价回归因子系数

self.threshold = threshold #中间价变动阈值

self.beta_oi = beta_oi #成交量不平衡因子系数序列

self.beta_rou = beta_rou #委比因子系数序列

self.active_secs = active_secs #交易时间区间

self.stoppl = stoppl #止盈止损配置

'''内部数据'''

self.__last_tick__ = None #上一笔行情

self.__orders__ = dict() #策略相关的订单

self.__last_entry_time__ = None #上次入场时间

self.__cancel_cnt__ = 0 #正在撤销的订单数

self.__channel_ready__ = False #通道是否就绪

self.__comm_info__ = None

self.__to_clear__ = False

self._last_entry_price = 0.0

self._max_dyn_prof = 0.0

self._max_dyn_loss = 0.0

self._last_atp__ = 0.0

def is_active(self, curMin:int) -> bool:

for sec in self.active_secs:

if sec["start"] <= curMin and curMin <= sec["end"]:

return True

return False

def on_init(self, context:HftContext):

'''

策略初始化,启动的时候调用\n

用于加载自定义数据\n

@context 策略运行上下文

'''

self.__comm_info__ = context.stra_get_comminfo(self.__code__)

#先订阅实时数据

context.stra_sub_ticks(self.__code__)

self.__ctx__ = context

def check_orders(self, ctx:HftContext):

#如果未完成订单不为空

ord_cnt = len(self.__orders__.keys())

if ord_cnt > 0 and self.__last_entry_time__ is not None:

#当前时间,一定要从api获取,不然回测会有问题

now = makeTime(ctx.stra_get_date(), ctx.stra_get_time(), ctx.stra_get_secs())

span = now - self.__last_entry_time__

total_secs = span.total_seconds()

if total_secs >= self.__expsecs__: #如果订单超时,则需要撤单

ctx.stra_log_text("%d条订单超时撤单" % (ord_cnt))

for localid in self.__orders__:

ctx.stra_cancel(localid)

self.__cancel_cnt__ += 1

ctx.stra_log_text("在途撤单数 -> %d" % (self.__cancel_cnt__))

def get_price(self, newTick, pricemode=0):

if pricemode == 0:

return newTick["price"]

elif pricemode == 1:

return newTick["askprice"][0] if len(newTick["askprice"])>0 else newTick["price"]

elif pricemode == -1:

return newTick["bidprice"][0] if len(newTick["bidprice"])>0 else newTick["price"]

def on_tick(self, context:HftContext, stdCode:str, newTick:dict):

if self.__code__ != stdCode:

return

#如果有未完成订单,则进入订单管理逻辑

if len(self.__orders__.keys()) != 0:

self.check_orders(context)

return

if not self.__channel_ready__:

return

curMin = context.stra_get_time()

curPos = context.stra_get_position(stdCode)

# 不在交易时间,则检查是否有持仓

# 如果有持仓,则需要清理

if not self.is_active(curMin):

self._last_atp__ = 0.0

if curPos == 0:

return

self.__to_clear__ = True

else:

self.__to_clear__ = False

# 如果需要清理持仓,且不在撤单过程中

if self.__to_clear__ :

if self.__cancel_cnt__ == 0:

if curPos > 0:

targetPx = self.get_price(newTick, -1)

ids = context.stra_sell(self.__code__, targetPx, abs(curPos), "deadline")

#将订单号加入到管理中

for localid in ids:

self.__orders__[localid] = localid

elif curPos < 0:

targetPx = self.get_price(newTick, 1)

ids = context.stra_buy(self.__code__, targetPx, abs(curPos), "deadline")

#将订单号加入到管理中

for localid in ids:

self.__orders__[localid] = localid

return

# 止盈止损逻辑

if curPos != 0 and self.stoppl["active"]:

isLong = (curPos > 0)

price = 0

if self.stoppl["calc_price"] == 0:

price = self.get_price(newTick, -1) if isLong else self.get_price(newTick, 1)

else:

price = newTick["price"]

diffTicks = (price - self._last_entry_price)*(1 if isLong else -1) / self.__comm_info__.pricetick

if diffTicks > 0:

self._max_dyn_prof = max(self._max_dyn_prof, diffTicks)

else:

self._max_dyn_loss = min(self._max_dyn_loss, diffTicks)

bNeedExit = False

usertag = ''

stop_ticks = self.stoppl["stop_ticks"]

track_threshold = self.stoppl["track_threshold"]

fallback_boundary = self.stoppl["fallback_boundary"]

if diffTicks <= stop_ticks:

context.stra_log_text("浮亏%.0f超过%d跳,止损离场" % (diffTicks, stop_ticks))

bNeedExit = True

usertag = "stoploss"

elif self._max_dyn_prof >= track_threshold and diffTicks <= fallback_boundary:

context.stra_log_text("浮赢回撤%.0f->%.0f[阈值%.0f->%.0f],止盈离场" % (self._max_dyn_prof, diffTicks, track_threshold, fallback_boundary))

bNeedExit = True

usertag = "stopprof"

if bNeedExit:

targetprice = self.get_price(newTick, -1) if isLong else self.get_price(newTick, 1)

ids = context.stra_sell(self.__code__, targetprice, abs(curPos), usertag) if isLong else context.stra_buy(self.__code__, price, abs(curPos), usertag)

for localid in ids:

self.__orders__[localid] = localid

# 出场逻辑执行以后结束逻辑

return

now = makeTime(self.__ctx__.stra_get_date(), self.__ctx__.stra_get_time(), self.__ctx__.stra_get_secs())

# 成交量为0且上一个成交均价为0,则需要退出

if newTick["volumn"] == 0 and self._last_atp__ == 0.0:

return

#如果已经入场,且有频率限制,则做频率检查

if self.__last_entry_time__ is not None and self.__freq__ != 0:

#当前时间,一定要从api获取,不然回测会有问题

span = now - self.__last_entry_time__

if span.total_seconds() <= self.__freq__:

return

hisTicks = context.stra_get_ticks(self.__code__, self.count + 1)

if hisTicks.size != self.count+1:

return

if (len(newTick["askprice"]) == 0) or (len(newTick["bidprice"]) == 0):

return

spread = newTick["askprice"][0] - newTick["bidprice"][0]

total_OIR = 0.0

total_rou = 0.0

for i in range(1, self.count + 1):

prevTick = hisTicks.get_tick(i-1)

curTick = hisTicks.get_tick(i)

lastBidPx = self.get_price(prevTick, -1)

lastAskPx = self.get_price(prevTick, 1)

lastBidQty = prevTick["bidqty"][0] if len(prevTick["bidqty"]) > 0 else 0

lastAskQty = prevTick["askqty"][0] if len(prevTick["askqty"]) > 0 else 0

curBidPx = self.get_price(curTick, -1)

curAskPx = self.get_price(curTick, 1)

curBidQty = curTick["bidqty"][0] if len(curTick["bidqty"]) > 0 else 0

curAskQty = curTick["askqty"][0] if len(curTick["askqty"]) > 0 else 0

delta_vb = 0.0

delta_va = 0.0

if curBidPx < lastBidPx:

delta_vb = 0.0

elif curBidPx == lastBidPx:

delta_vb = curBidQty - lastBidQty

else:

delta_vb = curBidQty

if curAskPx < lastAskPx:

delta_va = curAskQty

elif curAskPx == lastAskPx:

delta_va = curAskQty - lastAskQty

else:

delta_va = 0.0

voi = delta_vb - delta_va

total_OIR += self.beta_oi[i-1]*voi/spread

#计算委比

rou = (curBidQty - curAskQty)/(curBidQty + curAskQty)

total_rou += self.beta_rou[i-1]*rou/spread

prevTick = hisTicks.get_tick(-2)

# t-1时刻的中间价

prevMP = (self.get_price(prevTick, -1) + self.get_price(prevTick, 1))/2

# 最新的中间价

curMP = (newTick["askprice"][0] + newTick["bidprice"][0])/2

# 两个快照之间的成交均价

if newTick["volumn"] != 0:

avgTrdPx = newTick["turn_over"]/newTick["volumn"]/self.__comm_info__.volscale

elif self._last_atp__!= 0:

avgTrdPx = self._last_atp__

else:

avgTrdPx = curMP

self._last_atp__ = avgTrdPx

# 计算中间价回归因子

curR = avgTrdPx - (prevMP + curMP) / 2

# 计算预期中间价变化量

efpc = self.beta_0 + total_OIR + total_rou + self.beta_r * curR / spread

if efpc >= self.threshold:

targetPos = self.__lots__

diffPos = targetPos - curPos

if diffPos != 0.0:

targetPx = newTick["askprice"][0]

ids = context.stra_buy(self.__code__, targetPx, abs(diffPos), "enterlong")

#将订单号加入到管理中

for localid in ids:

self.__orders__[localid] = localid

self.__last_entry_time__ = now

self._max_dyn_prof = 0

self._max_dyn_loss = 0

elif efpc <= -self.threshold:

targetPos = -self.__lots__

diffPos = targetPos - curPos

if diffPos != 0:

targetPx = newTick["bidprice"][0]

ids = context.stra_sell(self.__code__, targetPx, abs(diffPos), "entershort")

#将订单号加入到管理中

for localid in ids:

self.__orders__[localid] = localid

self.__last_entry_time__ = now

self._max_dyn_prof = 0.0

self._max_dyn_loss = 0.0

def on_bar(self, context:HftContext, stdCode:str, period:str, newBar:dict):

return

def on_channel_ready(self, context:HftContext):

undone = context.stra_get_undone(self.__code__)

if undone != 0 and len(self.__orders__.keys()) == 0:

context.stra_log_text("%s存在不在管理中的未完成单%f手,全部撤销" % (self.__code__, undone))

isBuy = (undone > 0)

ids = context.stra_cancel_all(self.__code__, isBuy)

for localid in ids:

self.__orders__[localid] = localid

self.__cancel_cnt__ += len(ids)

context.stra_log_text("在途撤单数 -> %d" % (self.__cancel_cnt__))

self.__channel_ready__ = True

def on_channel_lost(self, context:HftContext):

context.stra_log_text("交易通道连接丢失")

self.__channel_ready__ = False

def on_entrust(self, context:HftContext, localid:int, stdCode:str, bSucc:bool, msg:str, userTag:str):

return

def on_order(self, context:HftContext, localid:int, stdCode:str, isBuy:bool, totalQty:float, leftQty:float, price:float, isCanceled:bool, userTag:str):

if localid not in self.__orders__:

return

if isCanceled or leftQty == 0:

self.__orders__.pop(localid)

if self.__cancel_cnt__ > 0:

self.__cancel_cnt__ -= 1

self.__ctx__.stra_log_text("在途撤单数 -> %d" % (self.__cancel_cnt__))

return

def on_trade(self, context:HftContext, localid:int, stdCode:str, isBuy:bool, qty:float, price:float, userTag:str):

self._last_entry_price = price

模型回测

模型编码完成以后,我们就可以考虑模型回测了。

数据准备

笔者共享了股指期货主力合约2020年12月到2021年1月份的tick数据到百度网盘中,地址如下:
https://pan.baidu.com/s/1Bdxh... 提取码:d6bh

文件名为CFFEX.IF.HOT_ticks_20201201_20210118.7z,读者可以自行获取。

数据格式为WonderTrader内部压缩存放的数据格式.dsb,如果要做回归的话,那么还需要将.dsb文件导出为csv文件。wtpy中的WtDtHelper模块中就提供了数据转换的方法,调用代码如下:

from wtpy.wrapper import WtDataHelper

import os

dtHelper = WtDataHelper()

dtHelper.dump_ticks('dsb文件所在的目录', '要输出的csv目录')

csv数据导出以后,就可以利用python读取数据进行模型线性回归了。

回测入口

线性回归做好以后,得到一组系数。然后编写回测入口脚本,代码如下:

from wtpy import WtBtEngine, EngineType

from strategies.HftStraOrdImbal import HftStraOrderImbalance

def read_params_from_csv(filename) -> dict:

params = {

"beta_0":0.0,

"beta_r":0.0,

"beta_oi":[],

"beta_rou":[]

}

f = open(filename, "r")

lines = f.readlines()

f.close()

for row in range(1, len(lines)):

curLine = lines[row]

ay = curLine.split(",")

if row == 1:

params["beta_0"] = float(ay[1])

elif row == 14:

params["beta_r"] = float(ay[1])

elif row > 1 and row <=7:

params["beta_oi"].append(float(ay[1]))

elif row > 7 and row <=13:

params["beta_rou"].append(float(ay[1]))

return params

if __name__ == "__main__":

# 创建一个运行环境,并加入策略

engine = WtBtEngine(EngineType.ET_HFT)

engine.init('.\\Common\\', "configbt.json")

engine.configBacktest(202101040900,202101181500)

engine.configBTStorage(mode="csv", path="./storage/")

engine.commitBTConfig()

active_sections = [

{

"start": 931,

"end": 1457

}

]

stop_params = {

"active":True, # 是否启用止盈止损

"stop_ticks": -25, # 止损跳数,如果浮亏达到该跳数,则直接止损

"track_threshold": 15, # 追踪止盈阈值跳数,超过该阈值则触发追踪止盈

"fallback_boundary": 2, # 追踪止盈回撤边界跳数,即浮盈跳数回撤到该边界值以下,立即止盈

"calc_price":0

}

params = read_params_from_csv('IF_10ticks_20201201_20201231.csv')

straInfo = HftStraOrderImbalance(name='hft_IF',

code="CFFEX.IF.HOT",

count=6,

lots=1,

threshold=0.3,

expsecs=5,

offset=0,

freq=0,

active_secs=active_sections,

stoppl=stop_params,

**params)

engine.set_hft_strategy(straInfo)

engine.run_backtest()

kw = input('press any key to exit\n')

engine.release_backtest()

回测结果

我们使用2020年12月的全部tick进行线性回归,得到的参数用于2021年1月回测得到的绩效如下:

date,closeprofit,positionprofit,dynbalance,fee

20210104,-11160.00,0.00,-20941.01,9781.01

20210105,-20100.00,0.00,-40712.85,20612.85

20210106,-60.00,0.00,-31828.36,31768.36

20210107,4140.00,0.00,-40344.73,44484.73

20210108,-11760.00,0.00,-66329.60,54569.60

20210111,-41280.00,0.00,-107444.80,66164.80

20210112,-66000.00,0.00,-142723.28,76723.28

20210113,-87240.00,0.00,-175926.18,88686.18

20210114,-106680.00,0.00,-202219.21,95539.21

20210115,-96840.00,0.00,-197721.78,100881.78

20210118,-110760.00,0.00,-219671.31,108911.31

【Python】WonderTrader高频交易初探及v0.6发布
从上面的绩效可以看出,该模型的表现倒是比较稳定,可惜是稳定的亏钱[手动狗头],实在是难堪大用。

绩效分析

策略表现虽然难以入目,但是我们还是要进行绩效分析,看看有没有可以改进的点。WonderTrader针对HFT回测生成了回合明细closes.csv,可以看到每个回合的进场点和出场点,以及每个回合潜在最大收益和潜在最大亏损。用户可以利用回合明细根据需求自行分析每个回合进出场的点位是否合理,以及如何优化等问题。

code,direct,opentime,openprice,closetime,closeprice,qty,profit,maxprofit,maxloss,totalprofit,entertag,exittag

CFFEX.IF.HOT,SHORT,20210104093156400,5221,20210104093218400,5221,1,-0,480,-540,0,entershort,enterlong

CFFEX.IF.HOT,LONG,20210104093218400,5221,20210104093219900,5222,1,300,300,0,300,enterlong,entershort

CFFEX.IF.HOT,SHORT,20210104093219900,5222,20210104093226900,5223,1,-300,120,-480,0,entershort,enterlong

CFFEX.IF.HOT,LONG,20210104093226900,5223,20210104093301400,5216.8,1,-1860,240,-2040,-1860,enterlong,stoploss

CFFEX.IF.HOT,SHORT,20210104093317400,5210.8,20210104093319900,5211.2,1,-120,0,-480,-1980,entershort,enterlong

CFFEX.IF.HOT,LONG,20210104093320400,5210.6,20210104093347900,5211.4,1,240,540,-1080,-1740,enterlong,entershort

CFFEX.IF.HOT,SHORT,20210104093347900,5211.4,20210104093410900,5211,1,120,660,-480,-1620,entershort,enterlong

CFFEX.IF.HOT,LONG,20210104093410900,5211,20210104093424400,5203.4,1,-2280,0,-2460,-3900,enterlong,stoploss

CFFEX.IF.HOT,SHORT,20210104093432900,5201.2,20210104093446900,5207.2,1,-1800,120,-2040,-5700,entershort,stoploss

结束语

到此为止,一个完整的HFT策略开发流程就走完了。虽然该模型似乎已经失效,但是笔者并没有深入分析当前IF的市场和原模型回测的时间区间的IF的市场之间的差别,另外笔者也没有拓展到别的品种进行分析。再者,笔者的主要目的是演示HFT策略的研发流程,所以关于模型方面难免有所疏漏。模型方面的做法,请各位读者稍作参考即可。

值得一提的是,从上面的源码中可以看到,WonderTrader针对HFT策略的交易接口简化成了买、卖两个交易接口,目的就是为了简化策略开发的逻辑,让策略人研发人员将更多的精力集中在策略逻辑本身。而买卖对应的开平逻辑,会在C++核心通过配置文件actionpolicy.json进行控制,自动处理开平。另外,该策略使用Python开发,而C++版本的相同策略,回测时间约为Python版本的十分之一左右,如果有读者想要利用WonderTrader上高频,在开发语言方面,还请各位读者仔细斟酌。

笔者也会不断地完善WonderTrader在HFT策略方面的功能。也希望各位读者能多多指正WonderTrader的疏漏,帮助WonderTrader完善起来,也能为更多的用户提供更好的基础设施服务。

最后再来一波广告

WonderTrader的github地址:https://github.com/wondertrad...

WonderTrader官网地址:https://wondertrader.github.io

wtpy的github地址:https://github.com/wondertrad...

【Python】WonderTrader高频交易初探及v0.6发布

pythonc++量化金融科技

阅读 19更新于 3 分钟前

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议


WonderTrader专栏

主要发布WonderTrader相关的各种官方资讯

avatar

WonderTrader

14 声望

5 粉丝

0 条评论

得票时间

avatar

WonderTrader

14 声望

5 粉丝

宣传栏

【Python】WonderTrader高频交易初探及v0.6发布

前言

自从WonderTrader实现了HFT策略引擎以来,一直都没有时间彻底的将高频策略研发、回测、仿真、实盘整个流程彻底走通一遍。所以趁着最近公司要上高频的机会,笔者基于WonderTrader把高频策略的应用彻底梳理了一遍。

本文的主要目的就是帮助用户初步了解WonderTraderHFT引擎上如何开发策略的。

平台准备

之前实盘框架下的HFT引擎已经基本完成,但是回测框架下的HFT策略的支持因为事情太多一直没有完善。这次彻底梳理HFT引擎,正好把回测部分也完善了一下。HFT回测引擎完善之后,WonderTrader也正好发布一个新版本v0.6.0。

高频模型介绍

本文采用的高频模型,源自Darryl ShenLinacre College University of Oxford)于2015年5月27日发表的《Order Imbalance Based Strategy in High Frequency Trading》一文(网络上可以找到)。

该模型基于Ltick数据中的委托量的不平衡因子、委比因子以及中间价回归因子三个因子,预测t0时刻之后的ktick数据的中间价的均价变化量,并以此构建线性模型。通过线性回归,得到各个因子的系数。线性方程如下:
【Python】WonderTrader高频交易初探及v0.6发布

方程中各个符号的具体含义,请感兴趣的读者自行检索。该文中使用2014年IF主力合约全年的tick进行回测,每次进出场以1手股指为单位,可以实现92.6% 的胜率,最优参数下,年化夏普率可以达到7.243,日均P&L在58600元。

模型实现

有了模型以后,我们开始来编写代码实现。因为本文旨在介绍HFT策略开发的流程,为了降低读者理解难度,策略都采用Python编写。

策略框架介绍

首先我们来看一下一个高频策略的基本结构:

class BaseHftStrategy:

'''

HFT策略基础类,所有的策略都从该类派生\n

包含了策略的基本开发框架

'''

def __init__(self, name):

self.__name__ = name

def name(self):

return self.__name__

def on_init(self, context:HftContext):

'''

策略初始化,启动的时候调用\n

用于加载自定义数据\n

@context 策略运行上下文

'''

return

def on_tick(self, context:HftContext, stdCode:str, newTick:dict):

'''

Tick数据进来时调用\n

@context 策略运行上下文\n

@stdCode 合约代码\n

@newTick 最新Tick

'''

return

def on_order_detail(self, context:HftContext, stdCode:str, newOrdQue:dict):

'''

逐笔委托数据进来时调用\n

@context 策略运行上下文\n

@stdCode 合约代码\n

@newOrdQue 最新逐笔委托

'''

return

def on_order_queue(self, context:HftContext, stdCode:str, newOrdQue:dict):

'''

委托队列数据进来时调用\n

@context 策略运行上下文\n

@stdCode 合约代码\n

@newOrdQue 最新委托队列

'''

return

def on_transaction(self, context:HftContext, stdCode:str, newTrans:dict):

'''

逐笔成交数据进来时调用\n

@context 策略运行上下文\n

@stdCode 合约代码\n

@newTrans 最新逐笔成交

'''

return

def on_bar(self, context:HftContext, stdCode:str, period:str, newBar:dict):

'''

K线闭合时回调

@context 策略上下文\n

@stdCode 合约代码

@period K线周期

@newBar 最新闭合的K线

'''

return

def on_channel_ready(self, context:HftContext):

'''

交易通道就绪通知\n

@context 策略上下文\n

'''

return

def on_channel_lost(self, context:HftContext):

'''

交易通道丢失通知\n

@context 策略上下文\n

'''

return

def on_entrust(self, context:HftContext, localid:int, stdCode:str, bSucc:bool, msg:str, userTag:str):

'''

下单结果回报

@context 策略上下文\n

@localid 本地订单id\n

@stdCode 合约代码\n

@bSucc 下单结果\n

@mes 下单结果描述

'''

return

def on_order(self, context:HftContext, localid:int, stdCode:str, isBuy:bool, totalQty:float, leftQty:float, price:float, isCanceled:bool, userTag:str):

'''

订单回报

@context 策略上下文\n

@localid 本地订单id\n

@stdCode 合约代码\n

@isBuy 是否买入\n

@totalQty 下单数量\n

@leftQty 剩余数量\n

@price 下单价格\n

@isCanceled 是否已撤单

'''

return

def on_trade(self, context:HftContext, localid:int, stdCode:str, isBuy:bool, qty:float, price:float, userTag:str):

'''

成交回报

@context 策略上下文\n

@stdCode 合约代码\n

@isBuy 是否买入\n

@qty 成交数量\n

@price 成交价格

'''

return

整个策略的结构大致可以分为四块:

  • 策略本身的回调
  • 行情数据的回调
  • 交易通道的回调
  • 交易回报的回调

其中行情数据的回调,主要包括on_tickon_barlevel2数据回调,本文中只需要关注on_tick即可;交易通道的回调,主要是通知策略交易通道的连接和断开事件;交易回报的回调,主要是订单回报、成交回报以及下单回报。

参数设计

根据模型的逻辑,我们设置回溯tick数为5,中间价变动的阈值为0.3,那么我们便可以将策略参数设计如下:

'''交易参数'''

self.__code__ = code #交易合约

self.__expsecs__ = expsecs #订单超时秒数,用于控制超时撤单

self.__freq__ = freq #交易频率控制,指定时间内限制信号数,单位秒

self.__lots__ = lots #单次交易手数

self.count = count #回溯tick条数

self.beta_0 = beta_0 #常量系数+残差

self.beta_r = beta_r #中间价回归因子系数

self.threshold = threshold #中间价变动阈值

self.beta_oi = beta_oi #成交量不平衡因子系数序列

self.beta_rou = beta_rou #委比因子系数序列

self.active_secs = active_secs #交易时间区间

self.stoppl = stoppl #止盈止损配置

核心逻辑

在大致了解了HFT策略的结构以后,我们就可以开始来编码了。整个策略的核心逻辑,集中在on_tick回调中,主要就是上述模型的计算,代码如下:

hisTicks = context.stra_get_ticks(self.__code__, self.count + 1)

if hisTicks.size != self.count+1:

return

if (len(newTick["askprice"]) == 0) or (len(newTick["bidprice"]) == 0):

return

spread = newTick["askprice"][0] - newTick["bidprice"][0]

total_OIR = 0.0

total_rou = 0.0

# 计算不平衡因子和委比因子的累加之和

for i in range(1, self.count + 1):

prevTick = hisTicks.get_tick(i-1)

curTick = hisTicks.get_tick(i)

lastBidPx = self.get_price(prevTick, -1)

lastAskPx = self.get_price(prevTick, 1)

lastBidQty = prevTick["bidqty"][0] if len(prevTick["bidqty"]) > 0 else 0

lastAskQty = prevTick["askqty"][0] if len(prevTick["askqty"]) > 0 else 0

curBidPx = self.get_price(curTick, -1)

curAskPx = self.get_price(curTick, 1)

curBidQty = curTick["bidqty"][0] if len(curTick["bidqty"]) > 0 else 0

curAskQty = curTick["askqty"][0] if len(curTick["askqty"]) > 0 else 0

delta_vb = 0.0

delta_va = 0.0

if curBidPx < lastBidPx:

delta_vb = 0.0

elif curBidPx == lastBidPx:

delta_vb = curBidQty - lastBidQty

else:

delta_vb = curBidQty

if curAskPx < lastAskPx:

delta_va = curAskQty

elif curAskPx == lastAskPx:

delta_va = curAskQty - lastAskQty

else:

delta_va = 0.0

voi = delta_vb - delta_va

total_OIR += self.beta_oi[i-1]*voi/spread

#计算委比

rou = (curBidQty - curAskQty)/(curBidQty + curAskQty)

total_rou += self.beta_rou[i-1]*rou/spread

prevTick = hisTicks.get_tick(-2)

# t-1时刻的中间价

prevMP = (self.get_price(prevTick, -1) + self.get_price(prevTick, 1))/2

# 最新的中间价

curMP = (newTick["askprice"][0] + newTick["bidprice"][0])/2

# 两个快照之间的成交均价

if newTick["volumn"] != 0:

avgTrdPx = newTick["turn_over"]/newTick["volumn"]/self.__comm_info__.volscale

elif self._last_atp__!= 0:

avgTrdPx = self._last_atp__

else:

avgTrdPx = curMP

self._last_atp__ = avgTrdPx

# 计算中间价回归因子

curR = avgTrdPx - (prevMP + curMP) / 2

# 计算预期中间价变化量

efpc = self.beta_0 + total_OIR + total_rou + self.beta_r * curR / spread

if efpc >= self.threshold:

targetPos = self.__lots__

diffPos = targetPos - curPos

if diffPos != 0.0:

targetPx = newTick["askprice"][0]

ids = context.stra_buy(self.__code__, targetPx, abs(diffPos), "enterlong")

#将订单号加入到管理中

for localid in ids:

self.__orders__[localid] = localid

self.__last_entry_time__ = now

self._max_dyn_prof = 0

self._max_dyn_loss = 0

elif efpc <= -self.threshold:

targetPos = -self.__lots__

diffPos = targetPos - curPos

if diffPos != 0:

targetPx = newTick["bidprice"][0]

ids = context.stra_sell(self.__code__, targetPx, abs(diffPos), "entershort")

#将订单号加入到管理中

for localid in ids:

self.__orders__[localid] = localid

self.__last_entry_time__ = now

self._max_dyn_prof = 0.0

self._max_dyn_loss = 0.0

止盈止损逻辑

但是对于高频策略,除了核心的进出场逻辑之外,止盈止损逻辑也是非常重要的一部分。本文中的策略采用固定点位止损+跟踪止盈来作为止盈止损逻辑,代码如下:

# 止盈止损逻辑

if curPos != 0 and self.stoppl["active"]:

isLong = (curPos > 0)

# 首先获取最新的价格,calc_price为0的话,使用对手价计算浮盈,calc_price为1的话,使用最新价计算浮盈

price = 0

if self.stoppl["calc_price"] == 0:

price = self.get_price(newTick, -1) if isLong else self.get_price(newTick, 1)

else:

price = newTick["price"]

#然后计算浮动盈亏的跳数

diffTicks = (price - self._last_entry_price)*(1 if isLong else -1) / self.__comm_info__.pricetick

if diffTicks > 0:

self._max_dyn_prof = max(self._max_dyn_prof, diffTicks)

else:

self._max_dyn_loss = min(self._max_dyn_loss, diffTicks)

bNeedExit = False

usertag = ''

stop_ticks = self.stoppl["stop_ticks"]

track_threshold = self.stoppl["track_threshold"]

fallback_boundary = self.stoppl["fallback_boundary"]

if diffTicks <= stop_ticks:

context.stra_log_text("浮亏%.0f超过%d跳,止损离场" % (diffTicks, stop_ticks))

bNeedExit = True

usertag = "stoploss"

elif self._max_dyn_prof >= track_threshold and diffTicks <= fallback_boundary:

context.stra_log_text("浮赢回撤%.0f->%.0f[阈值%.0f->%.0f],止盈离场" % (self._max_dyn_prof, diffTicks, track_threshold, fallback_boundary))

bNeedExit = True

usertag = "stopprof"

if bNeedExit:

targetprice = self.get_price(newTick, -1) if isLong else self.get_price(newTick, 1)

ids = context.stra_sell(self.__code__, targetprice, abs(curPos), usertag) if isLong else context.stra_buy(self.__code__, price, abs(curPos), usertag)

for localid in ids:

self.__orders__[localid] = localid

# 出场逻辑执行以后结束逻辑

return

收盘前出场的逻辑

有了止盈止损逻辑,我们还需要添加一段收盘前出场的逻辑,代码如下:

curMin = context.stra_get_time()

curPos = context.stra_get_position(stdCode)

# 不在交易时间,则检查是否有持仓

# 如果有持仓,则需要清理

if not self.is_active(curMin):

self._last_atp__ = 0.0

if curPos == 0:

return

self.__to_clear__ = True

else:

self.__to_clear__ = False

# 如果需要清理持仓,且不在撤单过程中

if self.__to_clear__ :

if self.__cancel_cnt__ == 0:

if curPos > 0:

# 以对手价挂单

targetPx = self.get_price(newTick, -1)

ids = context.stra_sell(self.__code__, targetPx, abs(curPos), "deadline")

#将订单号加入到管理中

for localid in ids:

self.__orders__[localid] = localid

elif curPos < 0:

# 以对手价挂单

targetPx = self.get_price(newTick, 1)

ids = context.stra_buy(self.__code__, targetPx, abs(curPos), "deadline")

#将订单号加入到管理中

for localid in ids:

self.__orders__[localid] = localid

return

订单管理逻辑

然后,我们还需要添加一段订单管理的逻辑,代码如下:

def check_orders(self, ctx:HftContext):

#如果未完成订单不为空

ord_cnt = len(self.__orders__.keys())

if ord_cnt > 0 and self.__last_entry_time__ is not None:

#当前时间,一定要从api获取,不然回测会有问题

now = makeTime(ctx.stra_get_date(), ctx.stra_get_time(), ctx.stra_get_secs())

span = now - self.__last_entry_time__

total_secs = span.total_seconds()

if total_secs >= self.__expsecs__: #如果订单超时,则需要撤单

ctx.stra_log_text("%d条订单超时撤单" % (ord_cnt))

for localid in self.__orders__:

ctx.stra_cancel(localid)

self.__cancel_cnt__ += 1

ctx.stra_log_text("在途撤单数 -> %d" % (self.__cancel_cnt__))

其他逻辑

除了上述的逻辑之外,我们还需要处理一些细节问题,如:

  • 处理订单回报,用于更新本地订单的状态;
  • 处理成交回报,用于更新入场价格,计算浮动盈亏
  • 处理交易通道就绪的回报,用于检查是否有不在管理内的未完成单

完整源码

整个策略的完整代码如下:

from wtpy import BaseHftStrategy

from wtpy import HftContext

from datetime import datetime

def makeTime(date:int, time:int, secs:int):

'''

将系统时间转成datetime\n

@date 日期,格式如20200723\n

@time 时间,精确到分,格式如0935\n

@secs 秒数,精确到毫秒,格式如37500

'''

return datetime(year=int(date/10000), month=int(date%10000/100), day=date%100,

hour=int(time/100), minute=time%100, second=int(secs/1000), microsecond=secs%1000*1000)

class HftStraOrderImbalance(BaseHftStrategy):

def __init__(self, name:str, code:str, count:int, lots:int, beta_0:float, beta_r:float, threshold:float,

beta_oi:list, beta_rou:list, expsecs:int, offset:int, freq:int, active_secs:list, stoppl:dict, reserve:int=0):

BaseHftStrategy.__init__(self, name)

'''交易参数'''

self.__code__ = code #交易合约

self.__expsecs__ = expsecs #订单超时秒数,用于控制超时撤单

self.__freq__ = freq #交易频率控制,指定时间内限制信号数,单位秒

self.__lots__ = lots #单次交易手数

self.count = count #回溯tick条数

self.beta_0 = beta_0 #常量系数+残差

self.beta_r = beta_r #中间价回归因子系数

self.threshold = threshold #中间价变动阈值

self.beta_oi = beta_oi #成交量不平衡因子系数序列

self.beta_rou = beta_rou #委比因子系数序列

self.active_secs = active_secs #交易时间区间

self.stoppl = stoppl #止盈止损配置

'''内部数据'''

self.__last_tick__ = None #上一笔行情

self.__orders__ = dict() #策略相关的订单

self.__last_entry_time__ = None #上次入场时间

self.__cancel_cnt__ = 0 #正在撤销的订单数

self.__channel_ready__ = False #通道是否就绪

self.__comm_info__ = None

self.__to_clear__ = False

self._last_entry_price = 0.0

self._max_dyn_prof = 0.0

self._max_dyn_loss = 0.0

self._last_atp__ = 0.0

def is_active(self, curMin:int) -> bool:

for sec in self.active_secs:

if sec["start"] <= curMin and curMin <= sec["end"]:

return True

return False

def on_init(self, context:HftContext):

'''

策略初始化,启动的时候调用\n

用于加载自定义数据\n

@context 策略运行上下文

'''

self.__comm_info__ = context.stra_get_comminfo(self.__code__)

#先订阅实时数据

context.stra_sub_ticks(self.__code__)

self.__ctx__ = context

def check_orders(self, ctx:HftContext):

#如果未完成订单不为空

ord_cnt = len(self.__orders__.keys())

if ord_cnt > 0 and self.__last_entry_time__ is not None:

#当前时间,一定要从api获取,不然回测会有问题

now = makeTime(ctx.stra_get_date(), ctx.stra_get_time(), ctx.stra_get_secs())

span = now - self.__last_entry_time__

total_secs = span.total_seconds()

if total_secs >= self.__expsecs__: #如果订单超时,则需要撤单

ctx.stra_log_text("%d条订单超时撤单" % (ord_cnt))

for localid in self.__orders__:

ctx.stra_cancel(localid)

self.__cancel_cnt__ += 1

ctx.stra_log_text("在途撤单数 -> %d" % (self.__cancel_cnt__))

def get_price(self, newTick, pricemode=0):

if pricemode == 0:

return newTick["price"]

elif pricemode == 1:

return newTick["askprice"][0] if len(newTick["askprice"])>0 else newTick["price"]

elif pricemode == -1:

return newTick["bidprice"][0] if len(newTick["bidprice"])>0 else newTick["price"]

def on_tick(self, context:HftContext, stdCode:str, newTick:dict):

if self.__code__ != stdCode:

return

#如果有未完成订单,则进入订单管理逻辑

if len(self.__orders__.keys()) != 0:

self.check_orders(context)

return

if not self.__channel_ready__:

return

curMin = context.stra_get_time()

curPos = context.stra_get_position(stdCode)

# 不在交易时间,则检查是否有持仓

# 如果有持仓,则需要清理

if not self.is_active(curMin):

self._last_atp__ = 0.0

if curPos == 0:

return

self.__to_clear__ = True

else:

self.__to_clear__ = False

# 如果需要清理持仓,且不在撤单过程中

if self.__to_clear__ :

if self.__cancel_cnt__ == 0:

if curPos > 0:

targetPx = self.get_price(newTick, -1)

ids = context.stra_sell(self.__code__, targetPx, abs(curPos), "deadline")

#将订单号加入到管理中

for localid in ids:

self.__orders__[localid] = localid

elif curPos < 0:

targetPx = self.get_price(newTick, 1)

ids = context.stra_buy(self.__code__, targetPx, abs(curPos), "deadline")

#将订单号加入到管理中

for localid in ids:

self.__orders__[localid] = localid

return

# 止盈止损逻辑

if curPos != 0 and self.stoppl["active"]:

isLong = (curPos > 0)

price = 0

if self.stoppl["calc_price"] == 0:

price = self.get_price(newTick, -1) if isLong else self.get_price(newTick, 1)

else:

price = newTick["price"]

diffTicks = (price - self._last_entry_price)*(1 if isLong else -1) / self.__comm_info__.pricetick

if diffTicks > 0:

self._max_dyn_prof = max(self._max_dyn_prof, diffTicks)

else:

self._max_dyn_loss = min(self._max_dyn_loss, diffTicks)

bNeedExit = False

usertag = ''

stop_ticks = self.stoppl["stop_ticks"]

track_threshold = self.stoppl["track_threshold"]

fallback_boundary = self.stoppl["fallback_boundary"]

if diffTicks <= stop_ticks:

context.stra_log_text("浮亏%.0f超过%d跳,止损离场" % (diffTicks, stop_ticks))

bNeedExit = True

usertag = "stoploss"

elif self._max_dyn_prof >= track_threshold and diffTicks <= fallback_boundary:

context.stra_log_text("浮赢回撤%.0f->%.0f[阈值%.0f->%.0f],止盈离场" % (self._max_dyn_prof, diffTicks, track_threshold, fallback_boundary))

bNeedExit = True

usertag = "stopprof"

if bNeedExit:

targetprice = self.get_price(newTick, -1) if isLong else self.get_price(newTick, 1)

ids = context.stra_sell(self.__code__, targetprice, abs(curPos), usertag) if isLong else context.stra_buy(self.__code__, price, abs(curPos), usertag)

for localid in ids:

self.__orders__[localid] = localid

# 出场逻辑执行以后结束逻辑

return

now = makeTime(self.__ctx__.stra_get_date(), self.__ctx__.stra_get_time(), self.__ctx__.stra_get_secs())

# 成交量为0且上一个成交均价为0,则需要退出

if newTick["volumn"] == 0 and self._last_atp__ == 0.0:

return

#如果已经入场,且有频率限制,则做频率检查

if self.__last_entry_time__ is not None and self.__freq__ != 0:

#当前时间,一定要从api获取,不然回测会有问题

span = now - self.__last_entry_time__

if span.total_seconds() <= self.__freq__:

return

hisTicks = context.stra_get_ticks(self.__code__, self.count + 1)

if hisTicks.size != self.count+1:

return

if (len(newTick["askprice"]) == 0) or (len(newTick["bidprice"]) == 0):

return

spread = newTick["askprice"][0] - newTick["bidprice"][0]

total_OIR = 0.0

total_rou = 0.0

for i in range(1, self.count + 1):

prevTick = hisTicks.get_tick(i-1)

curTick = hisTicks.get_tick(i)

lastBidPx = self.get_price(prevTick, -1)

lastAskPx = self.get_price(prevTick, 1)

lastBidQty = prevTick["bidqty"][0] if len(prevTick["bidqty"]) > 0 else 0

lastAskQty = prevTick["askqty"][0] if len(prevTick["askqty"]) > 0 else 0

curBidPx = self.get_price(curTick, -1)

curAskPx = self.get_price(curTick, 1)

curBidQty = curTick["bidqty"][0] if len(curTick["bidqty"]) > 0 else 0

curAskQty = curTick["askqty"][0] if len(curTick["askqty"]) > 0 else 0

delta_vb = 0.0

delta_va = 0.0

if curBidPx < lastBidPx:

delta_vb = 0.0

elif curBidPx == lastBidPx:

delta_vb = curBidQty - lastBidQty

else:

delta_vb = curBidQty

if curAskPx < lastAskPx:

delta_va = curAskQty

elif curAskPx == lastAskPx:

delta_va = curAskQty - lastAskQty

else:

delta_va = 0.0

voi = delta_vb - delta_va

total_OIR += self.beta_oi[i-1]*voi/spread

#计算委比

rou = (curBidQty - curAskQty)/(curBidQty + curAskQty)

total_rou += self.beta_rou[i-1]*rou/spread

prevTick = hisTicks.get_tick(-2)

# t-1时刻的中间价

prevMP = (self.get_price(prevTick, -1) + self.get_price(prevTick, 1))/2

# 最新的中间价

curMP = (newTick["askprice"][0] + newTick["bidprice"][0])/2

# 两个快照之间的成交均价

if newTick["volumn"] != 0:

avgTrdPx = newTick["turn_over"]/newTick["volumn"]/self.__comm_info__.volscale

elif self._last_atp__!= 0:

avgTrdPx = self._last_atp__

else:

avgTrdPx = curMP

self._last_atp__ = avgTrdPx

# 计算中间价回归因子

curR = avgTrdPx - (prevMP + curMP) / 2

# 计算预期中间价变化量

efpc = self.beta_0 + total_OIR + total_rou + self.beta_r * curR / spread

if efpc >= self.threshold:

targetPos = self.__lots__

diffPos = targetPos - curPos

if diffPos != 0.0:

targetPx = newTick["askprice"][0]

ids = context.stra_buy(self.__code__, targetPx, abs(diffPos), "enterlong")

#将订单号加入到管理中

for localid in ids:

self.__orders__[localid] = localid

self.__last_entry_time__ = now

self._max_dyn_prof = 0

self._max_dyn_loss = 0

elif efpc <= -self.threshold:

targetPos = -self.__lots__

diffPos = targetPos - curPos

if diffPos != 0:

targetPx = newTick["bidprice"][0]

ids = context.stra_sell(self.__code__, targetPx, abs(diffPos), "entershort")

#将订单号加入到管理中

for localid in ids:

self.__orders__[localid] = localid

self.__last_entry_time__ = now

self._max_dyn_prof = 0.0

self._max_dyn_loss = 0.0

def on_bar(self, context:HftContext, stdCode:str, period:str, newBar:dict):

return

def on_channel_ready(self, context:HftContext):

undone = context.stra_get_undone(self.__code__)

if undone != 0 and len(self.__orders__.keys()) == 0:

context.stra_log_text("%s存在不在管理中的未完成单%f手,全部撤销" % (self.__code__, undone))

isBuy = (undone > 0)

ids = context.stra_cancel_all(self.__code__, isBuy)

for localid in ids:

self.__orders__[localid] = localid

self.__cancel_cnt__ += len(ids)

context.stra_log_text("在途撤单数 -> %d" % (self.__cancel_cnt__))

self.__channel_ready__ = True

def on_channel_lost(self, context:HftContext):

context.stra_log_text("交易通道连接丢失")

self.__channel_ready__ = False

def on_entrust(self, context:HftContext, localid:int, stdCode:str, bSucc:bool, msg:str, userTag:str):

return

def on_order(self, context:HftContext, localid:int, stdCode:str, isBuy:bool, totalQty:float, leftQty:float, price:float, isCanceled:bool, userTag:str):

if localid not in self.__orders__:

return

if isCanceled or leftQty == 0:

self.__orders__.pop(localid)

if self.__cancel_cnt__ > 0:

self.__cancel_cnt__ -= 1

self.__ctx__.stra_log_text("在途撤单数 -> %d" % (self.__cancel_cnt__))

return

def on_trade(self, context:HftContext, localid:int, stdCode:str, isBuy:bool, qty:float, price:float, userTag:str):

self._last_entry_price = price

模型回测

模型编码完成以后,我们就可以考虑模型回测了。

数据准备

笔者共享了股指期货主力合约2020年12月到2021年1月份的tick数据到百度网盘中,地址如下:
https://pan.baidu.com/s/1Bdxh... 提取码:d6bh

文件名为CFFEX.IF.HOT_ticks_20201201_20210118.7z,读者可以自行获取。

数据格式为WonderTrader内部压缩存放的数据格式.dsb,如果要做回归的话,那么还需要将.dsb文件导出为csv文件。wtpy中的WtDtHelper模块中就提供了数据转换的方法,调用代码如下:

from wtpy.wrapper import WtDataHelper

import os

dtHelper = WtDataHelper()

dtHelper.dump_ticks('dsb文件所在的目录', '要输出的csv目录')

csv数据导出以后,就可以利用python读取数据进行模型线性回归了。

回测入口

线性回归做好以后,得到一组系数。然后编写回测入口脚本,代码如下:

from wtpy import WtBtEngine, EngineType

from strategies.HftStraOrdImbal import HftStraOrderImbalance

def read_params_from_csv(filename) -> dict:

params = {

"beta_0":0.0,

"beta_r":0.0,

"beta_oi":[],

"beta_rou":[]

}

f = open(filename, "r")

lines = f.readlines()

f.close()

for row in range(1, len(lines)):

curLine = lines[row]

ay = curLine.split(",")

if row == 1:

params["beta_0"] = float(ay[1])

elif row == 14:

params["beta_r"] = float(ay[1])

elif row > 1 and row <=7:

params["beta_oi"].append(float(ay[1]))

elif row > 7 and row <=13:

params["beta_rou"].append(float(ay[1]))

return params

if __name__ == "__main__":

# 创建一个运行环境,并加入策略

engine = WtBtEngine(EngineType.ET_HFT)

engine.init('.\\Common\\', "configbt.json")

engine.configBacktest(202101040900,202101181500)

engine.configBTStorage(mode="csv", path="./storage/")

engine.commitBTConfig()

active_sections = [

{

"start": 931,

"end": 1457

}

]

stop_params = {

"active":True, # 是否启用止盈止损

"stop_ticks": -25, # 止损跳数,如果浮亏达到该跳数,则直接止损

"track_threshold": 15, # 追踪止盈阈值跳数,超过该阈值则触发追踪止盈

"fallback_boundary": 2, # 追踪止盈回撤边界跳数,即浮盈跳数回撤到该边界值以下,立即止盈

"calc_price":0

}

params = read_params_from_csv('IF_10ticks_20201201_20201231.csv')

straInfo = HftStraOrderImbalance(name='hft_IF',

code="CFFEX.IF.HOT",

count=6,

lots=1,

threshold=0.3,

expsecs=5,

offset=0,

freq=0,

active_secs=active_sections,

stoppl=stop_params,

**params)

engine.set_hft_strategy(straInfo)

engine.run_backtest()

kw = input('press any key to exit\n')

engine.release_backtest()

回测结果

我们使用2020年12月的全部tick进行线性回归,得到的参数用于2021年1月回测得到的绩效如下:

date,closeprofit,positionprofit,dynbalance,fee

20210104,-11160.00,0.00,-20941.01,9781.01

20210105,-20100.00,0.00,-40712.85,20612.85

20210106,-60.00,0.00,-31828.36,31768.36

20210107,4140.00,0.00,-40344.73,44484.73

20210108,-11760.00,0.00,-66329.60,54569.60

20210111,-41280.00,0.00,-107444.80,66164.80

20210112,-66000.00,0.00,-142723.28,76723.28

20210113,-87240.00,0.00,-175926.18,88686.18

20210114,-106680.00,0.00,-202219.21,95539.21

20210115,-96840.00,0.00,-197721.78,100881.78

20210118,-110760.00,0.00,-219671.31,108911.31

【Python】WonderTrader高频交易初探及v0.6发布
从上面的绩效可以看出,该模型的表现倒是比较稳定,可惜是稳定的亏钱[手动狗头],实在是难堪大用。

绩效分析

策略表现虽然难以入目,但是我们还是要进行绩效分析,看看有没有可以改进的点。WonderTrader针对HFT回测生成了回合明细closes.csv,可以看到每个回合的进场点和出场点,以及每个回合潜在最大收益和潜在最大亏损。用户可以利用回合明细根据需求自行分析每个回合进出场的点位是否合理,以及如何优化等问题。

code,direct,opentime,openprice,closetime,closeprice,qty,profit,maxprofit,maxloss,totalprofit,entertag,exittag

CFFEX.IF.HOT,SHORT,20210104093156400,5221,20210104093218400,5221,1,-0,480,-540,0,entershort,enterlong

CFFEX.IF.HOT,LONG,20210104093218400,5221,20210104093219900,5222,1,300,300,0,300,enterlong,entershort

CFFEX.IF.HOT,SHORT,20210104093219900,5222,20210104093226900,5223,1,-300,120,-480,0,entershort,enterlong

CFFEX.IF.HOT,LONG,20210104093226900,5223,20210104093301400,5216.8,1,-1860,240,-2040,-1860,enterlong,stoploss

CFFEX.IF.HOT,SHORT,20210104093317400,5210.8,20210104093319900,5211.2,1,-120,0,-480,-1980,entershort,enterlong

CFFEX.IF.HOT,LONG,20210104093320400,5210.6,20210104093347900,5211.4,1,240,540,-1080,-1740,enterlong,entershort

CFFEX.IF.HOT,SHORT,20210104093347900,5211.4,20210104093410900,5211,1,120,660,-480,-1620,entershort,enterlong

CFFEX.IF.HOT,LONG,20210104093410900,5211,20210104093424400,5203.4,1,-2280,0,-2460,-3900,enterlong,stoploss

CFFEX.IF.HOT,SHORT,20210104093432900,5201.2,20210104093446900,5207.2,1,-1800,120,-2040,-5700,entershort,stoploss

结束语

到此为止,一个完整的HFT策略开发流程就走完了。虽然该模型似乎已经失效,但是笔者并没有深入分析当前IF的市场和原模型回测的时间区间的IF的市场之间的差别,另外笔者也没有拓展到别的品种进行分析。再者,笔者的主要目的是演示HFT策略的研发流程,所以关于模型方面难免有所疏漏。模型方面的做法,请各位读者稍作参考即可。

值得一提的是,从上面的源码中可以看到,WonderTrader针对HFT策略的交易接口简化成了买、卖两个交易接口,目的就是为了简化策略开发的逻辑,让策略人研发人员将更多的精力集中在策略逻辑本身。而买卖对应的开平逻辑,会在C++核心通过配置文件actionpolicy.json进行控制,自动处理开平。另外,该策略使用Python开发,而C++版本的相同策略,回测时间约为Python版本的十分之一左右,如果有读者想要利用WonderTrader上高频,在开发语言方面,还请各位读者仔细斟酌。

笔者也会不断地完善WonderTrader在HFT策略方面的功能。也希望各位读者能多多指正WonderTrader的疏漏,帮助WonderTrader完善起来,也能为更多的用户提供更好的基础设施服务。

最后再来一波广告

WonderTrader的github地址:https://github.com/wondertrad...

WonderTrader官网地址:https://wondertrader.github.io

wtpy的github地址:https://github.com/wondertrad...

【Python】WonderTrader高频交易初探及v0.6发布

以上是 【Python】WonderTrader高频交易初探及v0.6发布 的全部内容, 来源链接: utcz.com/a/110283.html

回到顶部