在FMZ平台搭建多因子模型(二):展期收益率单因子模型

Author: ianzeng123, Created: 2023-11-10 09:26:31, Updated: 2024-02-28 21:50:30

img

在商品期货市场,多因子模型是用于解释价格波动和预测市场走势的重要工具。多因子模型的一种简单模式就是单因子模型。本节课程将深入介绍单因子模型,并以展期收益作为单因子来探讨其概念、计算方法以及如何搭建模型筛选多空品种。作为多因子模型的基础,理解单因子模型是掌握多因子模型的关键步骤。

  1. 单因子模型:多因子模型的基础

上节课我们介绍了多因子模型的具体概念。它假设资产价格的变动是由多个因子引起的,如市场因子、价值因子、动量因子等。对于我们初始来说,一开始就搭建这么复杂的模型肯定是不适合的。所以我们决定从单因子模型入手。单因子模型则是多因子模型的最基本形式,只考虑一个因子对资产价格的影响。在这节课程中,我们将介绍展期收益作为单因子,探讨如何使用它来分析期货市场。

  1. 展期收益是什么?

这节课我们要使用的单因子是展期收益。展期收益(Roll Yield)是期货市场中的一种单因子,表示期货合同的价格与现货市场价格之间的差异。它通常反映了市场对未来价格变动的预期。当对某品种未来价格比较悲观的时候,近期期货价格高于远期期货价格时,展期收益为正,称为远月合约贴水,或现货升水;相反比较乐观的时候,近期期货价格低于远期期货价格,展期收益为负,称为远月合约升水,或现货贴水。

展期收益的计算通常针对不同交割日期的期货合同,以分析市场对不同交割日期的价格预期。展期收益可以通过以下简单的公式来计算:

展期收益(年度) = Log(近期合约期货价格/ 远期合约期货价格)/间隔月份 * 12

展期收益率有一个特性——单调性,强者恒强,弱者恒弱。按照专业机构提供的交易逻辑:不同合约的展期收益率排名可在一定程度上体现“多强空弱”,这里的“强”、“弱”概念可以认为是目前该品种趋势的判断,所以我们的交易逻辑就是从展期收益率的角度做到“顺势而为”。按照“多强空弱”中的理念,我们做多展期收益率最高的品种,做空展期收益率最低的品种,从而根据展期收益率排名构建相应的交易策略。

以上呢,就是展期收益率的介绍。当然我们的重点还是通过这个策略是实现单因子模型,我们来看怎样使用代码进行实现。

这个单因子策略是作为多因子模型准备的,因此各个模块函数需要保持复用性,就是在多因子模型中也可以稳定使用。所以呢,这次我们决定使用模板类库的方式,构建各个模块函数,这样呢,即使我们使用单因子或者多个因子去尝试因子的有效性,都是可以的。

第一个模块我们来补充getTarList函数,但是需要额外注意的一点是,这里我们不仅要获取主力合约,还要获取次主力合约,所以这里面定义mainList代表主力合约列表,nextList定义次主力合约列表。然后我们使用轮询在代码和具体主力月份的字典里,找到当前的主力合约代码和次主力合约代码,首先获取当前时间,使用“_D()”,然后获取当前的年份和月份,接着就要处理不同的情况了。当月份是12月的时候,我们更换主力合约,因为下一年的第一个主力合约即将到期,所以我们更换主力合约为下一年的第二个主力,次主力呢,更换为下一年的第三个。这里面需要对大小写进行不同的处理,如果是大写,年份保留2位,大写保留1位。然后针对于第一个主力到第二个主力,第二个到第三个,到三个到12月份,我们进行轮换的处理,这种方法确实比较复杂一点,但是理解起来更加简单。有的同学可能会询问为什么不使用代码加上888,然后获取代码的instrumentid呢,因为系统的主力合约是根据交易量和持仓量决定的,所以有时候非主力合约会成为不在列表里的主力合约,造成次主力合约判断出现错误。

# 获取主力/次主力合约代码
def getTarList():
    mainList = [] # 主力合约列表
    nextList = [] # 次主力合约列表

    for commodity, monthList in commodity_contracts.items():

        curTime = _D()
        curYear = curTime[2:4]
        curMonth = curTime[5:7]

        if int(curMonth) == 12:
            if commodity.isupper():
                mainID = commodity + str(int(curYear[1])+1) + str(monthList[1])
                nextID = commodity + str(int(curYear[1])+1) + str(monthList[2])
            else:
                mainID = commodity + str(int(curYear)+1) + str(monthList[1])
                nextID = commodity + str(int(curYear)+1) + str(monthList[2])

        elif int(curMonth) >= int(monthList[0]) and int(curMonth) < int(monthList[1]) - 1:

            if commodity.isupper(): 
                mainID = commodity + str(curYear[1]) + str(monthList[1])
                nextID = commodity + str(curYear[1]) + str(monthList[2])
            else:
                mainID = commodity + str(curYear) + str(monthList[1])
                nextID = commodity + str(curYear) + str(monthList[2])
        elif int(curMonth) >= int(monthList[1]) - 1 and int(curMonth) < int(monthList[2]) - 1:
            if commodity.isupper(): 
                mainID = commodity + str(curYear[1]) + str(monthList[2])
                nextID = commodity + str(int(curYear[1])+1) + str(monthList[0])
            else:
                mainID = commodity + str(curYear) + str(monthList[2])
                nextID = commodity + str(int(curYear)+1) + str(monthList[0])

        elif int(curMonth) < 12:
            if commodity.isupper(): 
                mainID = commodity + str(int(curYear[1])+1) + str(monthList[0])
                nextID = commodity + str(int(curYear[1])+1) + str(monthList[1])
            else:
                mainID = commodity + str(int(curYear)+1) + str(monthList[0])
                nextID = commodity + str(int(curYear)+1) + str(monthList[1])

        mainList.append(mainID)
        nextList.append(nextID)
    
    return [mainList, nextList]

第二个getMainData,这个函数功能是获取不同列表,不同周期的k线数据,所以这里增加两个参数,周期和合约列表。这里呢,在使用getTarList函数后,第一个索引就是主力合约的列表,第二个索引是次主力合约的列表,作为参数放入这个函数当中,然后使用轮询的方式,获取倒数第二条记录,是完成的k线,添加进入DataFrame当中,进行返回就可以获取到主力和次主力的k线数据。

# 获取主力日级别k线,方便后续因子计算

def getMainData(PERIOD, contractList):

    df = pd.DataFrame(columns=["Instrument", "InstrumentId", "Time", "Open", "High", "Low", "Close", "Volume"])
    
    for codeID in contractList:

        info = exchange.SetContractType(codeID)

        code = re.search(r'([A-Za-z]+)\d+', codeID).group(1)
        records = exchange.GetRecords(PERIOD)
        latest_record = records[-2]  # 获取倒数第二条记录

        curR = {
            "Instrument": code,
            "InstrumentId": codeID,
            "Time": latest_record['Time'],
            "Open": latest_record['Open'],
            "High": latest_record['High'],
            "Low": latest_record['Low'],
            "Close": latest_record['Close'],
            "Volume": latest_record['Volume']
        }

        # 将字典转换为DataFrame,并添加到主DataFrame中
        new_row = pd.DataFrame([curR])
        df = df.append(new_row, ignore_index=True)

    return df

在获取完成主力和次主力k线数据以后,第三个板块就是要计算展期收益率了。展期收益率的公式是这样的,所以我们也使用轮询的方式对上两个板块中获取的数据,针对于不同的合约进行依次的计算。这里需要注意的是,针对于间隔月份的计算,因为同处于一年的主力合约,可以直接使用两个月份之差进行计算,如果跨年,那么次主力的月份就要加上12。

# 计算展期收益率

import math

def calRollover():
    
    mainData = getMainData(PERIOD_D1, getTarList()[0])
    nextData = getMainData(PERIOD_D1, getTarList()[1])

    rolloverList = []  # 用于存储展期收益率的列表

    for i in range(len(mainData)):
        mainLine = mainData.iloc[i]
        nextLine = nextData.iloc[i]

        mainFuture = mainLine['InstrumentId']
        nextFuture = nextLine['InstrumentId']
        
        mainDate = re.findall(r"\d+\d*", mainFuture)
        nextDate = re.findall(r"\d+\d*", nextFuture)

        if int(nextDate[0][-2:]) > int(mainDate[0][-2:]):
            diffMonth = int(nextDate[0][-2:]) - int(mainDate[0][-2:])
        else:
            diffMonth = int(nextDate[0][-2:]) + 12 - int(mainDate[0][-2:])

        rollOver = math.log(mainLine['Close'] / nextLine['Close']) / diffMonth * 12

        rolloverList.append(rollOver)

    rolloverDf = pd.DataFrame({
        "InstrumentId": mainData['InstrumentId'],
        "RollOver": rolloverList
    })

    return rolloverDf

这里因为只有一个因子,所以因子处理和因子合成的板块这里暂缺。

多空组判断groupFactor,这里定义两个参数,factorDf因子的dataframe,factor代表具体使用的因子。这里只有展期收益的因子,根据正负性分为做多组和做空组。

def groupFactor(factorDf, factorName):

    positive_df = factorDf[factorDf[factorName] > 0].sort_values(by=factorName)
    negative_df = factorDf[factorDf[factorName] < 0].sort_values(by=factorName)

    positive_codes = positive_df['InstrumentId'].tolist() # 做多组
    negative_codes = negative_df['InstrumentId'].tolist() # 做空组

    return[positive_codes, negative_codes]

对不同的品种进行分类完毕以后,我们就要进行交易的操作了。这里不仅仅是根据多空组的判断进行做多还有做空的操作。因为不同品种可能在在这两组之间进行切换,所以当品种切换组别的时候,我们进行平仓,在另外方向开仓的处理。

在获取仓位信息以后,在策略起始阶段,判断仓位列表长度为0,直接根据多空组的列表进行相应的操作。

后续判断持仓长度不为0,获取各个品种的持仓type,是多头还是空头,然后判断如果该品种不再做多组的时候,打印信息该品种需要被移动到做空组,首先进行多头的平仓,然后进行空头的开仓。这里为了交易的进行,都是设置了一个比较容易成交的价格。对于空头组的处理也是一致的,如果某品种不再属于空头组,进行空头平仓,然后就像多头开仓。

# 多空组买卖
def trade(pos_group, neg_group):
    posInfo = exchange.GetPosition()

    if len(posInfo) == 0:
        for i in range(len(pos_group)):
            exchange.SetContractType(pos_group[i])
            curPrice = exchange.GetRecords()[-1].Close + 5
            exchange.SetDirection('buy')
            exchange.Buy(curPrice, 1)
        
        for i in range(len(neg_group)):
            exchange.SetContractType(neg_group[i])
            curPrice = exchange.GetRecords()[-1].Close - 5
            exchange.SetDirection('sell')
            exchange.Sell(curPrice, 1)
        Log('多因子策略开始,建仓完成', "#FF0000")
    else:
        
        for i in range(len(posInfo)):
            if posInfo[i].Type == PD_LONG or posInfo[i].Type == PD_LONG_YD:

                if posInfo[i].ContractType not in pos_group:
                    Log('合约', posInfo[i].ContractType, '移仓至持空组')
                    exchange.SetContractType(posInfo[i].ContractType)
                    curPrice = exchange.GetRecords()[-1].Close - 5
                    exchange.SetDirection('closebuy')
                    exchange.Sell(curPrice, 1)

                    curPrice = exchange.GetRecords()[-1].Close - 5
                    exchange.SetDirection('sell')
                    exchange.Sell(curPrice, 1)
            else:
                if posInfo[i].ContractType not in neg_group:
                    Log('合约', posInfo[i].ContractType, '移仓至持多组')
                    exchange.SetContractType(posInfo[i].ContractType)
                    curPrice = exchange.GetRecords()[-1].Close + 5
                    exchange.SetDirection('closesell')
                    exchange.Buy(curPrice, 1)

                    curPrice = exchange.GetRecords()[-1].Close + 5
                    exchange.SetDirection('buy')
                    exchange.Buy(curPrice, 1)

还有最后一个板块,移仓的处理。首先获取主力合约列表,获取字母代码部分,方便我们根据品种的代码找到最新的主力合约,然后获取仓位的信息,如果仓位列表长度不为0,使用轮询的的方式,判断如果该品种不再主力列表当中,同样获取该品种的代码部分,对应索引找到该品种的主力合约。然后就进行平掉旧合约,开仓新合约,这里需要品种的type进行,保持该品种的多头和空头趋势。然后再次获取一下仓位的信息,如果都是主力合约,打印信息进行说明。

我们单因子模型需要的模块函数就定义完成了,然后我们使用ext加上函数名称的格式,定义这些模块函数,这样在我们就可以在策略编写中使用这个模版类库的交易函数了。下面呢,我们就进行一下测试。

# 移仓
def posTrans():

    mainList = getTarList()[0]
    
    codeList = [''.join(filter(str.isalpha, item)) for item in mainList]

    prePos = exchange.GetPosition()

    if len(prePos) != 0:
        for i in range(len(prePos)):
            if prePos[i].ContractType not in mainList:
                mainCode = re.search(r'([A-Za-z]+)\d+', prePos[i].ContractType).group(1)
                index = codeList.index(mainCode)
                mainID = mainList[index]

                Log('旧合约', prePos[i].ContractType, '需要被更换为', mainID)
                if prePos[i].Type == PD_LONG or prePos[i].Type == PD_LONG_YD:
                    # 平掉旧合约
                    exchange.SetContractType(prePos[i].ContractType)
                    curPrice = exchange.GetRecords()[-1].Close - 5
                    exchange.SetDirection('closebuy')
                    exchange.Sell(curPrice, 1)

                    # 开仓新合约
                    exchange.SetContractType(mainID)
                    curPrice = exchange.GetRecords()[-1].Close + 5
                    exchange.SetDirection('buy')
                    exchange.Buy(curPrice, 1)

                else:
                    exchange.SetContractType(prePos[i].ContractType)
                    curPrice = exchange.GetRecords()[-1].Close + 5
                    exchange.SetDirection('closesell')
                    exchange.Buy(curPrice, 1)

                    exchange.SetContractType(mainID)
                    curPrice = exchange.GetRecords()[-1].Close - 5
                    exchange.SetDirection('sell')
                    exchange.Sell(curPrice, 1)

    # 移仓完成后再次判断
    afterPos = exchange.GetPosition()
    if len(prePos) != 0:
        all_in_main_list = all(afterPos[i].ContractType in mainList for i in range(len(afterPos)))
        if all_in_main_list:
            Log('所有合约都是主力合约', "#00FF00")

我们设置一个策略样例进行检验,固定框架安装上,首先第一步进行展期收益率的计算,然后划分做多组和做空组,在进行交易之前,首先进行移仓的处理,确保最新的仓位都是主力合约,接着进行对应的交易操作。这样呢,就是一个单因子模型的主体交易部分。当然为了实盘的运行,我们需要加上策略图表的展示,还有收益的展示。

这里比较特殊的一点是,因为我们想固定天数间隔运行一次策略,而策略的实时收益展示,我们是想及时更新的,所以这里计算了主操作的时间间隔,如果主操作的间隔大于固定周期,进行主策略的运行;而策略的实时展示,是一直在进行的。

设置休眠时间是10天,我们试着运行一下。在回测结果里,可以看到,首先进行建仓的处理,然后每隔10天,进行一下交易的操作,在品种对应组别更换的时候,进行相应的平仓还有相反方向开仓的处理。当接近主力合约更换的月份,我们进行移仓换月的处理。

def main():

    # 记录上次执行的时间
    last_operation_timeBig = 0
    isFirst = True
    initAccount = exchange.GetAccount()

    while True:
        current_timeBig = int(time.time())
        
        # 获取时间间隔(秒)
        if isFirst:
            time_intervalBig = 24 * 60 * 60 * 10
            isFirst = False
        else:
            time_intervalBig = current_timeBig - last_operation_timeBig

        # 执行多空组买卖之前的操作(每隔10天执行一次)
        if time_intervalBig >= 24 * 60 * 60 * 10:
            
            if exchange.IO("status"):
                # 移仓
                ext.posTrans()

                # 计算展期收益率
                callRollover = ext.calRollover()
                
                # 划分组
                positive_codes = ext.groupFactor(callRollover, 'RollOver')[0]
                negative_codes = ext.groupFactor(callRollover, 'RollOver')[1]

                # 多空组买卖
                ext.trade(positive_codes, negative_codes)

                last_operation_timeBig = current_timeBig

            else:
                Sleep(1)
            
        positions = exchange.GetPosition()

        if len(positions) != 0:
            longContract = []
            longPrice = []
            longProfit = []

            shortContract = []
            shortPrice = []
            shortProfit = []

            for i in range(len(positions)):
                if (positions[i]['Type'] == PD_LONG) or (positions[i]['Type'] == PD_LONG_YD):
                    longContract.append(positions[i]['ContractType'])
                    longPrice.append(positions[i]['Price'])
                    longProfit.append(positions[i]['Profit'])
                else:
                    shortContract.append(positions[i]['ContractType'])
                    shortPrice.append(positions[i]['Price'])
                    shortProfit.append(positions[i]['Profit'])

            tblAStatus = {
                "type" : "table",
                "title" : "持多组",
                "cols" : ["合约名称", "持仓均价", "持仓盈亏"],
                "rows" : [] 
            }

            tblBStatus = {
                "type" : "table",
                "title" : "持空组",
                "cols" : ["合约名称", "持仓均价", "持仓盈亏"],
                "rows" : [] 
            }

            for i in range(len(longContract)):
                tblAStatus["rows"].append([longContract[i], longPrice[i], longProfit[i]])

            for i in range(len(shortContract)):
                tblBStatus["rows"].append([shortContract[i], shortPrice[i], shortProfit[i]])

            lastStatus = f"`{json.dumps(tblAStatus)}`\n`{json.dumps(tblBStatus)}`"

            curAccount = exchange.GetAccount()

            curProfit = curAccount.Balance - initAccount.Balance    
            LogProfit(curProfit, "&")

            LogStatus(lastStatus)

        Sleep(1)

本节的课程呢,是一个单因子的示范策略。在后续课程中,我们将继续探讨多因子模型,包括如何选择和组合不同的因子,以构建更强大的模型来分析和预测商品期货市场的走势。了解单因子模型是深入理解多因子模型的第一步,当然后续课程的难度也会逐步的增加,让我们慢慢来。

视频参考链接:

《期货市场的免费午餐?期限结构Carry收益模型分享》

本系列课程旨在为大家介绍多因子模型在商品期货量化交易中的应用,其他相关文章请点击下面链接:


更多内容