一个品种多个策略要跑怎么办——python多进程数据分发解决方案分享

Author: 王木木0015, Created: 2021-11-29 19:17:08, Updated: 2023-11-22 20:35:47

img

一、起因

我在部署实盘时,用一个期货公司账户,交易螺纹和热卷两个品种的5分钟线,而每个品种下又要跑6个不同setting的策略。 比如螺纹的6个setting,使用的都是同一个数据(螺纹主力,5分钟k线),此时我有几个选择

(a)开6个机器人,每个机器人单独的收集数据和跑策略

(b)开1个机器人,不用多进程,主程序收集数据,当新的5分钟线ready的时候,用for循环遍历各个策略

(c)开1个机器人,用一个进程收集数据,当新的5分钟线ready的时候,把数据同时分发给6个策略(6个进程)

先说(a),且不说6个机器人的费用,交易所会限制查询和交易的间隔,这么做可能会导致部分机器人数据更新的不及时;而且中间存在6次重复的查询,没必要。 再看(b),for循环中计算指标和开仓要依次进行,这样6个setting有的就要慢一拍;而且你如果只用主程序串行执行代码,那开仓可以5分钟一次,止损难道也是5分钟一次吗,显然不合理。 而(c)这个虽然分发过程中也会存在耗时,但是相比前两个应该会少很多。

二、尝试

关于分发的方法,我考虑了几种可能,并尝试了下:

解决方案一:

【方法】:内存共享,数据收集进程每次修改共享内存的数据,其他6个策略进程则依次访问这块内存; 【尝试结果】:实现过程中,作为程序员觉得太不优雅,数据收集进程对于这块内存有读写权限,其他6个策略进程明明只需要写入权限;可是python里面这块内存不论读写都要lock住;而且没有阻塞功能,需要自己判断数据是否更新了;写着写着,越来越无法承受写出这种不优雅代码的负罪感;作罢

解决方案二:

【方法】:网络通信,数据收集进程创建一个http协议,然后其他6个策略进程不断访问这个http接口,并且判断是否数据更新了; 【尝试结果】:这个代码实现倒是非常简单,但是显而易见的——都在本机了还开本地网络接口传输干什么,想把自己慢死吗?

解决方案三:

【方法】:多进程的数据通信 【尝试结果】:没接受到数据会自动阻塞,分发速度较快,不存在多进程访问同一份内存数据的隐患;尝试了几种多进程通信的方法(Pipe,Queue,Manager),测试了下速度,最后选择了最快的Pipe。参考这篇知乎文章

三、最终方案分享

这里贴一下多线程分发的最小实现代码,在本机和服务器的测速中,发送下面的这个330*10的一个pandas的dataframe,一个数据的发送到接受用了1-2ms。由于是依次发送,发完全部16个最大延迟5ms;对于我5分钟的策略完全够了,毕竟ticker也得500ms一个。高频就算了,高频用什么python

import datetime
import threading
import numpy as np
import pandas as pd
from multiprocessing import Process, Pipe

# 数据收集进程,把自己写的数据查询收集的代码该进去;一旦数据ready就分发出去
def data_produce(pipe_list, pid):
    data = np.ones((330, 10))
    data = pd.DataFrame(data)
    cnt = 0
    while 1:
        cnt += 1
        if cnt>10: exit(0)
        time.sleep(0.5)
        for p in pipe_list:
            p.send({'data':data, 'now':datetime.datetime.now().time()})

# 策略代码,接收新的数据;没接收到时会阻塞在pipe.recv()这里,收到后执行策略代码,开仓闭仓等
def data_analyse(pipe, pid):
    while 1:
        rec = pipe.recv()
        now = datetime.datetime.now().time()
        # 看下第一个和最后一个进程的发送接收延迟
        if pid in ['analyse0', 'analyse15']:
            print(rec['now'], now, "received", rec['data'].shape, pid)

if __name__ == "__main__":

    num = 16
    sends = []
    recs = []
    processes = []
    # 这里是16个策略,共用同一个品种数据;所以创建16个数据分发的Pipe,每个Pipe有一个发送器和一个接收器
    for i in range(num):
        receive, send = Pipe()
        sends.append(send)
        recs.append(receive)
        
    # 只有一个数据收集进程,把16个数据分发Pipe的发送器交给这个进程
    processes.append(Process(target=data_produce, args=(sends, 'poducer')))
    # 我们有16个策略进程,把16个数据分发Pipe的接收器依次交给每一个策略进程
    for i in range(num):
        processes.append(Process(target=data_analyse, args=(recs[i], 'analyse{}'.format(i))))     
    # 跑吧
    for p in processes:
        p.start()
    for p in processes:
        p.join()

四、其他

1、我的实际代码中,是每个ticker都会分发一次,毕竟判断止损要时时刻刻判断;这个看需求随意设置了 2、如果不想出现“一核有难,七核围观”的盛况的话;python最好用多进程multiprocessing,而不是多线程threading;python的threading只是多个线程轮流调用一个进程的cpu资源而已。


更多内容