上一篇文章里,我们提及了如何设计向量化回测与调参。通过scikit-learn这套框架,你已经能快速的验证你的任何想法。同时通过不断地努力,也发现了一个颇有成效的策略模式,那么接下来,让我们把回测模型一点点转变得更接近现实世界吧。
scikit-learn
所以这一章,我们将分别聊一遍 事件驱动回测框架 和 实盘交易系统 的架构与实现细节。这篇开始就逐渐有工程味道了,小伙伴们准备好哦。
参考
承接著向量化回测,这一篇我们先说事件驱动回测模块。开始之前,先明确列出我们对事件驱动系统的需求:
1. 回测时,将尽可能模拟真实环境。
2. 策略扩展性强
3. 策略有实时风控、仓位管理
4. 工程上,要能保证策略开发完后能无缝对接到实盘之中
我们一个一个需求来解决。
首先先明确事件驱动概念。事件驱动有别于向量化回测,更加接近真实,信息不再是一次性以一大块矩阵的形式送到系统之中,而是以事件的形式一个一个被喂到系统里,触发设定好的回调来进行逻辑上的处理。
事件驱动回测中的各个模块如下,我们将逐一对其进行介绍。
我们先来理解一下各个模块的概念和作用,之后再正式进入细节上的探究。
Strategy,顾名思义,是我们的核心策略模块,包含了4个重要组件,分别是
Strategy
Event Processor
Position Manager
Risk Manager
Commander
StrategyEngine,策略引擎,负责管理多个策略。策略引擎在这里是一个抽象,具体实现上有两种,第一种就是回测引擎,另一种则是后面将提及的实时交易引擎。有了策略引擎这一层抽象,就能保证我们的策略在回测完之后,能无缝地接入到实盘进行交易。(需求4)
StrategyEngine
这里我们先介绍BackTestEngine,即回测引擎。回测引擎在程序中主要负责将数据(如bar、depth、trade、order等)转化成事件、按时间发生的顺序,一个个地喂给策略。其中比较重要的是两个组件。
BackTestEngine
Matcher
Analyzer
pyfolio
概念讲的随意,但深入各个组件的设计上就有很多活干了。庆幸的是,如第一篇文章曾提及的那样,我们并不需要完全从零开始设计,借鉴开源项目vnpy和backtrader,将帮我们的设计省下不少时间。
vnpy
backtrader
首先,我们先关注组件之间的信息流交互。下面是一张顺序图,描述了当一个bar数据被送到系统里时发生的操作。
现在我们重点关注一下如下几个组件
BackTest Engine
BackTest Engine的设计理念就是广为人知的观察者模式。我们将把回测引擎设计成观察者模式中Subject,是发布者。而策略则是一个Subscriber,即订阅者,当有相关主题的数据发出来时,策略中的Event Processor就会对事件进行处理。
Subject
Subscriber
一般来说,事件和对应的处理函数主要分为这几类, 其中的逻辑细节由交易员或研究员自行补充完整:
这块重点提一下订阅者与发布者这两个核心基类,实现如下:
class Subject(object): def __init__(self): super(Subject, self).__init__() self._handler_dict = defaultdict(list)
def register(self, topic: typing.Hashable, handler: typing.Callable): if not callable(handler): raise ValueError(Handler should be a callable object) if handler not in self._handler_dict[topic]: self._handler_dict[topic].append(handler)
def unregister(self, topic: typing.Hashable, handler: typing.Callable): if handler in self._handler_dict[topic]: self._handler_dict[topic].remove(handler) if not self._handler_dict[topic]: del self._handler_dict[topic]
def notify(self, topic: typing.Hashable, msg): if topic in self._handler_dict: for handler in self._handler_dict[topic]: handler(msg)
class Subscriber(object):
def __init__(self): super(Subscriber, self).__init__() self._topic_handler_list = [] self._subject: Subject = None
def subscribe(self, topic: typing.Hashable, handler: typing.Callable): self._topic_handler_list.append((topic, handler)) self._subject.register(topic, handler)
使用时主要分两步:
subscribe
notify
别小看这简简单单的十来行代码,在后期的工程开发,这个抽象模板将帮你省下不少功夫。模块之间的耦合性大大降低,组件间的信息流动更加明确,开发者也可以将更多的精力放在业务逻辑处理之中。
参考:vnpy/EventEngine。
Position Manager是比较关键的一块,此处,笔者并没有直接使用其他开源项目进行改造,而是自己动手重新设计,加入两组概念。
第一组新的概念是Position和Portfolio。Position是记录了一个合约上交易的盈亏,Portfolio则管理了多个Position对象。如前文所述,这里的Position要能支持多种资产(股票、外汇、电子货币)以及多种合约(现货、期货、掉期)。
Position
Portfolio
第二组概念 是base_asset、quote_asset,譬如对应外汇交易中,USD/HKD这样的合约交易中,USD就是base_asset、HKD是quote_asset。如果只进行股票、期货交易,可以略过这个设计,因为他们会使系统的复杂性升高,但相应的,你的系统也可以支持一些复杂的场景,例如外汇的三角套利。这里我们先略过其细节,等后续有机会讲实盘策略时再详述其设计。
base_asset
quote_asset
USD/HKD
USD
HKD
回到Position,其介面设计如下
from dataclasses import dataclass
@dataclass class PositionData: symbol: str = EMPTY_STRING amount: float = EMPTY_FLOAT total_value: float = EMPTY_FLOAT last_price: float = EMPTY_FLOAT cost: float = EMPTY_FLOAT total_pnl: float = EMPTY_FLOAT realized_pnl: float = EMPTY_FLOAT unrealized_pnl: float = EMPTY_FLOAT
def on_trade(self, trade: TradeData) pass
def update_market(self, price, dt): self.last_price = price self.datetime = dt self.calculate_unrealized_pnl()
def calculate_unrealized_pnl(self): pass
这里关键在于抽象介面on_trade,以及calculate_unrealized_pnl,分别负责计算委托成交时、以及实时市场价格变动时,组合的盈亏变动情况。
on_trade
calculate_unrealized_pnl
这样的抽象设计,就能使我们的程序不仅能处理现货交易(SpotPosition),即便是期权(OptionPosition)、掉期(SwapPosition)等复杂合约,也能顺利对接到系统之中,上层模块也便无需关心其中的实现细节,达到封装的效果。
SpotPosition
OptionPosition
SwapPosition
于是,一个最简单的均线交叉策略大概会长这个样子
class ShortableMACrossStrategy(CTAStrategyTemplate): def __init__(self, n_sma=13, n_lma=20, frequency=1h, stop_loss=-0.02): super(ShortableMACrossStrategy, self).__init__() self.n_sma = n_sma self.n_lma = n_lma self.frequency = frequency self.stop_loss = stop_loss
self._loss_control = MaxLossControl(self.stop_loss) self._loss_control.portfolio = self.portfolio self._close_deque = deque(max_len=n_lma + 1)
def on_bar(self, bar): super(ShortableMACrossStrategy, self).on_bar(bar)
# stop loss if self._loss_control.on_bar(bar): self.close_position(self.symbol, 2)
if bar.frequency == self.frequency and bar.symbol == self.symbol: array = self._close_deque array.append(bar.close) if len(array) != array.max_len: logger.debug("%ss data is not enough", self) return array = np.array(array) sma_line = talib.MA(array, self.n_sma) lma_line = talib.MA(array, self.l_sma)
# up cross if lma_line[-1] < sma_line[-1] and lma_line[-2] >= sma_line[-2]: self._full_trade(EnumOrderDirection.BUY)
# down cross if lma_line[-1] > sma_line[-1] and lma_line[-2] <= sma_line[-2]: self._full_trade(EnumOrderDirection.SELL)
至此,我们已经能用事件驱动的方式回测我们的策略,回测的结果也将更加符合真实情况。更重要的是,我们能复用这套框架,直接对接到我们的实盘交易系统中,保证策略「所测即所得」。
相同的,我们也先明确我们对实盘交易系统的需求所在。
第一个需求比较容易实现。我们可以直接将BackTestEngine替换成TradingEngine,负责管理多个策略。同时,我们也把这个TradingEngine抽象成一个Application,相同的层级还包括Scrawler实时数据爬取器,Monitor UI市场监控器等。
TradingEngine
Application
Scrawler
Monitor UI
在实现上,他们有一个相同点,那就是订阅并监听从上一层发过来的信息,原理即上文所介绍的订阅者模式。订阅各自关注的主题后,各个应用便能独立运行,也就是说,各个应用之前相互独立,同时与其依赖的底层关联度也极低,仅是信息订阅这一项而已,二者之间几乎没有耦合性,这能极大地方便我们开发的进程。(需求1)
于是,我们可以把关注点转移到这个应用们所依赖的这个」底层」,即交易系统的「交易服务中台」。
中台主要有两个核心组件, CEP(复杂事件处理引擎)+ OMS(订单管理系统)。
CEP,即Complex Event Processing,复杂事件处理系统,是真正意义上驱动整个系统运行的引擎。主要由如下三个组件构成
事件分发器,本质上也是一个Subject,负责将各个主题的分发出去,要稍微留意的是,单个事件的主题数量可多于1个。例如对于订单信息而言,我们一般赋予它两个主题:
EnumOrderStatus
(EnumOrderStatus, strategy_id)
strategy_id
信号计算器,负责将通过的信号计算出来,避免每个策略单独计算,浪费资源。通常的任务包括 - K线集成: 1分钟K --> N分钟K --> 小时K --> 日K - 成交量K线生成 - 期权greek计算
持久化引擎,负责将存储重要数据,并持久化到资料库中,同时具备实时查询功能。一般我们会将实时市场行情、订单委托、成交、命令请求等都存入到该引擎中。
技术层面上,推荐使用 ORM,这样能将技术细节的实现推到最后来完成,解耦系统与资料库的依赖关系。未来无论是连接MongoDB还是MySQL,只需切换数据连接器,便能顺滑过渡。
同时要注意的是,因为我们需要把数据持久化到资料库里,这就会涉及到I/O操作。I/O操作一般都要比内存操作慢N个数量级。为了保证系统运行的效率,我们一般会对此采取非同步操作。同样的处理也出现在后面要提及的向交易所发送交易请求。(需求3)
订单管理系统也由3个组件构成:
总风控和总仓位管理,将直接复用单策略里的风控、仓位管理对象。只是职责上,总风控将是更关注总体的仓位、订单风险,总仓位管理则实时汇总所有策略的仓位、盈亏情况。
这里我们更关注订单演算法引擎。实盘中有这样一个公式,利润 = 策略 + 交易,好的策略很重要,但好的交易方式也不可缺少,如何减少订单的冲击成本,如何获得市场的最佳价格,同样是一门学问。
利润 = 策略 + 交易
订单演算法引擎负责处理策略发来的订单请求,并按照设计好的演算法,将订单发送到交易所中。(需求2)
最常见的订单演算法,包括:
前3个演算法相对容易实现,提一下智能路由订单演算法。
这个演算法在电子货币交易领域中使用得很多,因为这个市场上的交易所实在是太多了,像ETH/BTC这样的合约,市场上有几十个,交易价格参差不齐。我们的目标是选择其中几个稳定的交易所,交易时,选择各个交易所中价格最优的进行挂单交易。
ETH/BTC
实现这个功能,需要我们要能快速地实时合并不同交易所订单簿,使得该合并订单簿能够:
抽象成演算法题,就是请你设计一个数据结构,使得查询O(1),插入O(1),删除O(1),排序O(N)的数据结构。显然,这不是单个数据结构就能解决的,需要多个数据结构之间互相配合才能实现。权且留作这篇文章的思考题吧,对未来的演算法面试会有帮助的。
回到订单结构的实现上,为了方便统一介面类型,我们可以将演算法订单整合到order_type中
order_type
class EnumOrderType(IntEnum): NONE = auto() LIMIT = auto() MARKET = auto() BLP = auto() VWAP = auto() TWAP = auto() STOP = auto() SOR = auto()
@dataclass() class OrderData(TradingBaseData): price: float = EMPTY_FLOAT volume: float = EMPTY_FLOAT
direction: EnumOrderDirection = EnumOrderDirection.NONE order_type: EnumOrderType = EnumOrderType.NONE status: EnumOrderStatus = EnumOrderStatus.NONE action: EnumOrderAction = EnumOrderAction.NONE
client_order_id: str = EMPTY_STRING order_id: str = EMPTY_STRING executed_volume: float = EMPTY_FLOAT
parameter: dict = field(default_factory=dict)
参考:vn.py/algoTrading
前台、中台都打通了,现在就差后台了,也就是底层真正和交易所进行连接的模块。
如图所示,这里主要有两个部分构成
Gateway,交易所对接网关,负责将各个交易所五花八门的介面统一起来,接入系统。每个Gateway至少需要包含以下职能:
Gateway
信息请求将包括 (1).合约下载 (2).交易请求 (3).信息查询
这里著重讲一下合约信息。合约信息是与交易所对接的关键,因为在发送订单等信息到交易所时,必须要严格符合该交易所的格式标准。例如订单发送时不能填写symbol(自身系统内的标的代表符号),而是symbol_exchange,对应到交易所系统的标的代表符号。其他的还有lot_size代表 每笔交易中交易量变动的最小量,tick_size代表每笔交易中价格变动的最小量。在发送订单之前,需要对这些量进行取整处理。
symbol
symbol_exchange
lot_size
tick_size
@dataclass class ContractData(MarketBaseData):
contract_type: str = EMPTY_STRING # 合约类型: 现货、期货、掉期、期权 symbol: str = EMPTY_STRING # 自身系统内的标的代表符号 symbol_exchange: str = EMPTY_STRING # 交易所系统的标的代表符号 exchange: str = EMPTY_STRING # 交易所
lot_size: float = EMPTY_FLOAT # 每笔交易中 交易量 变动的最小量 tick_size: float = EMPTY_FLOAT # 每笔交易中 价格 变动的最小量
max_price: float = MAX_FLOAT min_price: float = EMPTY_FLOAT
max_quantity: float = MAX_FLOAT min_quantity: float = EMPTY_FLOAT
max_notional: float = MAX_FLOAT min_notional: float = EMPTY_FLOAT
从这个合约介面就能看出,洗介面是一件相当繁琐的活,需要不断将各个值都对应上,而且往往还一个交易所一套介面类型。「行百里者半九十」,我们马上就能打通实盘交易了,不要惧怕这最后的脏活。
相似的,我们还需要把订单发送和查询的介面补充完整,请加油。
2. 信息订阅
上文的请求发送,一般是交易系统主动发出的,而信息订阅则是交易所主动推过来的。一般包括成交明细订阅、订单状态订阅、K线订阅、深度行情订阅。实现上,一般都是起一个线程单独监听对应的埠。例如电子货币交易中最常用的websockets介面。
websockets
在工程上的,交易所连接部分有2个注意事项:
Router,订单路由器,负责将订单发送到指定的交易所进行成交。(需求4)
实现上要做到的功能包括
- 多交易所:同时连接多个交易所,这是进行跨交易所套利的基础。
- 多账户:同时包含多个账户,这是对外提供交易服务的基础
- 多类型:可以设置不同的连接类型
如此一来,在策略回测完毕之后,可以先使用Simu trading模式,比对两者结果,确认策略无bug、系统无风险。之后,就可以上Paper trading,不使用真实账户资金,观察实盘效果。如果效果良好,那么,就切换到真正的实盘模式,加入自营资金,跑起策略来了。
实现上也很简单,我们将定义多个Router的子类。(需求5)
现在,我们的实盘交易系统差不多就建好了。当然,部署和运维也是一大块,后期也还需要不断维护和迭代。
一如既往,我们还是需要分析一下系统,了解其中的不足之处。
首先是关于系统的性能方面,系统的瓶颈将会出现在3个地方:
1+1
queue
multi-producer, multi-consumer
其次则是缺少一个统一的、标准化的、适合团队协同工作的解决方案。
如果这些环节不完善的话,那么整个交易系统也只是一堆花哨却无意义的代码。要与团队相结合,运作流转起来,才能最大程度地发挥系统的效能。
优秀的你对这两个方向一定也有很多自己的想法,所以请继续关注我的下篇文章,我们到时将一起深入探讨更快的技术和更有效率的解决方案。
回头看来,我们不难发现,这个交易系统已经不再只是简单的程序包了,而是一个完整的工程项目。工程项目的管理也是一门艺术,有不少的工程规则和知识,这里和大家分享一下这一路的心得
这些是都是软体工程中的基本知识,但每一个展开都有极深的内容,点点滴滴,相信在搭建系统的过程中,我们会慢慢积累、慢慢成长。这也算是造这个交易系统轮子的意义之一。
听到这,或许你会这样反问道:「太CS了吧...」
确实如此。你大可用其他的、任何你觉得合适的现成平台进行策略开发和交易。但原则上,你要能保证自己的发展不会被限制在某个在线平台、或者某个公司里。你的价值不会因为你的离职或者网站的关闭而减损。
对于一个Quant来说,当他受制于人的那一刻,就已经注定了是一场悲剧。
我的观点是:要一直刻意地训练自己,保持自己的竞争力。有独立的思考、也有扎实的技术和执行力,有能力随时验证自己的想法并将其执行落地,不被外界事物限制成长。