示例策略

默认模板(框架)

"""
默认策略
示例策略仅供参考,不建议直接实盘使用。
"""
from zltquant import *
# 必须
def initialize(context):
    # 设置基准收益:沪深300指数
    set_benchmark('sh000300')
    # 打印日志
    print('策略开始运行,初始化函数全局只运行一次')
    # 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
    set_order_cost(OrderCost(open_tax=0, close_tax=0.001, open_commission=0.0003, close_commission=0.0003, close_today_commission=0, min_commission=5), type='stock')
    # 为全部交易品种设定固定值滑点
    set_slippage(FixedSlippage(0.02))
    # 设置要操作的股票池
    g.security = []
    # 回测区间、初始资金、运行频率请在下方设置
# 每日开盘前9:00被调用一次,用于储存自定义参数、全局变量,执行盘前选股等
def before_trading(context):
    # 获取日期
    date = context.current_dt.strftime('%Y-%m-%d %H:%M:%S')
    # 打印日期
    print('{} 盘前运行'.format(date))
def handle_data(context):
    # 获取时间
    time = context.current_dt.strftime('%Y-%m-%d %H:%M:%S')
    # 打印时间
    print('{} 盘中运行'.format(time))
# 收盘后运行函数,用于储存自定义参数、全局变量,执行盘后选股等
def after_trading(context):
    # 获取时间
    time = context.current_dt.strftime('%Y-%m-%d %H:%M:%S')
    # 打印时间
    print('{} 盘后运行'.format(time))
    print('一天结束')
# 必须
if __name__ == '__main__':
    run_main()

均线择时

import pandas as pd
from zltquant import *

def initialize(context):
    # 设置要操作的股票
    g.security = 'sz000001'

def handle_data(context):
    short = 12
    long = 26
    mid = 9

    # 获取历史数据
    bar = get_price(g.security, end_date=context.current_dt, count=long + mid, frequency='1d', fq='pre', fill_suspend=True)
    close = pd.Series(bar['close'])

    # 计算EMA
    ema_short = close.ewm(span=short, adjust=False).mean()
    ema_long = close.ewm(span=long, adjust=False).mean()

    # 计算DIFF和DEA
    diff = ema_short - ema_long
    dea = diff.ewm(span=mid, adjust=False).mean()

    # 计算MACD
    macd = (diff - dea) * 2
    update_cash_info()
    # 金叉买入
    if diff.iloc[-1] > dea.iloc[-1] and macd.iloc[-1] > macd.iloc[-2] and g.security not in context.portfolio.positions.keys():
        # 记录这次买入
        order_value(g.security, context.portfolio.available_cash)

    # 死叉卖出   
    elif diff.iloc[-1] < dea.iloc[-1] and macd.iloc[-1] < macd.iloc[-2] and g.security in context.portfolio.positions.keys():
        # 记录这次卖出  
        order_target(g.security, 0)

if __name__ == '__main__':
    run_main()

布林线均值回归

"""
- 当股票价格触及布林线上轨且有持仓时, 卖出股票.
- 当股票价格触及布林线下轨且无持仓时, 买入股票.
"""
from zltquant import *

# 初始化函数,设定策略参数
def initialize(context):
    # 设置回测频率为每日
    set_benchmark('sh000300')  # 设置基准为沪深300指数
    # 设置股票池
    g.stock = 'sh600000'  # 这里平安银行
    # 设置布林线参数
    g.window = 20  # 布林线的窗口期
    g.num_std = 2  # 标准差倍数
    # 设置定时器,每天开盘时运行
    run_daily(trade, time='09:31')

# 交易函数
def trade(context):
    # 获取股票的历史数据
    dbar1 = get_price(g.stock, end_date=context.current_dt, count=g.window, fq="pre")
    stock_data = pd.Series(dbar1['close'])  # 将 close 数据转换为 pandas.Series

    # 计算布林线
    mean = stock_data.rolling(window=g.window).mean().iloc[-1]  # 中轨(移动平均)
    std = stock_data.rolling(window=g.window).std().iloc[-1]    # 标准差
    upper_band = mean + g.num_std * std  # 上轨
    lower_band = mean - g.num_std * std  # 下轨    
    # 获取当前价格
    current_price = stock_data.iloc[-1]
    # 获取当前持仓
    current_position = context.portfolio.positions[g.stock].total_amount

    # 交易逻辑
    if current_price > upper_band and current_position > 0:
        # 如果当前价格高于上轨且有持仓,则卖出
        order_target(g.stock, 0)
        log.info("卖出股票:%s" % g.stock)
    elif current_price < lower_band and current_position == 0:
        # 如果当前价格低于下轨且无持仓,则买入
        order_value(g.stock, context.portfolio.available_cash)
        log.info("买入股票:%s" % g.stock)
    update_cash_info()

# 收盘后运行函数
def after_trading_end(context):
    # 打印每日收盘后的信息
    log.info("每日收盘后运行")
    pass
if __name__ == '__main__':
    run_main()

多因子选股

from zltquant import *
import datetime

# 初始化函数,设定策略参数
def initialize(context):
    # 设置选股和交易频率,每天运行一次
    run_daily(select_and_trade, time='every_bar')

# 选股和交易函数
def select_and_trade(context):
    # 1. 选股逻辑
    selected_stocks = select_stocks(context)

    # 2. 更新持仓股票的被选中状态
    buy_list, sell_list = update_order_list(context,selected_stocks)

    # 3. 交易逻辑
    trade(context, buy_list, sell_list)

# 选股函数
def select_stocks(context):
    # 获取所有股票列表(创业板)
    stock_list = get_index_stocks('sz399006',date=context.current_dt)

    # 初始化选股结果
    selected_stocks = []

    # 遍历所有股票
    for stock in stock_list:
        # 获取过去60个交易日的收盘价
        hist = get_price(stock, end_date=context.current_dt, frequency='1d', fields=['close'], count=60, fq='pre',df=True)
        # print(hist)

        # 计算波动率(标准差)
        std_dev = hist['close'].std()

        # 计算价格的均值
        mean_price = hist['close'].mean()

        # 计算归一化波动率
        normalized_volatility = std_dev / mean_price

        # 计算5日均线和20日均线
        short_ma = hist['close'].rolling(window=5).mean().iloc[-1]
        long_ma = hist['close'].rolling(window=20).mean().iloc[-1]

        # 获取过去20个交易日的日均成交额
        hist_volume = get_price(stock, end_date=context.current_dt, frequency='1d', fields=['money'], count=20, fq='pre',df=True)
        avg_turnover = hist_volume['money'].mean()      
        # 筛选条件:
        # 1. 归一化波动率大于某个阈值
        # 2. 短期均线大于长期均线
        # 3. 日均成交额大于某个阈值
        if normalized_volatility > 0.2 and short_ma > long_ma and avg_turnover > 100000000:
            selected_stocks.append(stock)
    # 输出选股结果
    log.info('选中的股票列表:{}'.format(selected_stocks))

    return selected_stocks

# 更新持仓股票的被选中状态
def update_order_list(context,selected_stocks):
    buy_list = []
    sell_list = []
    # 获取当前持仓的股票
    long_positions = context.portfolio.positions

    # 遍历当前持仓的股票
    for stock in long_positions:
        if stock not in selected_stocks:
            # 如果持仓股不在选中列表中,标记卖出
            sell_list.append(stock)
    for stock in selected_stocks:
        if stock not in long_positions:
            # 如果选中的股票不在持仓列表中,标记买入
            buy_list.append(stock)
    return buy_list, sell_list

# 交易函数
def trade(context, buy_list, sell_list):
    # 1. 卖出未被选中的持仓股票
    for stock in sell_list:
        order_target(stock, 0)  # 卖出全部持仓
        update_cash_info()
        log.info('卖出股票:{}'.format(stock))

    # 2. 买入新选中的股票
    #  分配资金      
    if len(buy_list) != 0:
        cash = context.portfolio.available_cash / len(buy_list)
        for stock in buy_list:
            # 如果股票被选中且未持仓,买入
            order_target_value(stock, cash)  # 等权重买入
            update_cash_info()
            log.info('买入股票:{}'.format(stock))

if __name__ == '__main__':
    run_main()

趋势跟踪

import pandas as pd
from zltquant import *
import numpy as np

# 初始化函数,设定策略参数
def initialize(context):
    # 设置标的
    g.symbol = 'sz000001'
    #set_benchmark(g.symbol)
    # 储存前一个所处区间
    g.last_grid = 0
    # 以前一交易日的收盘价为中枢价格
    g.center = get_previous_close_price(context)
    # 记录上一次交易时区间的变化情况
    g.grid_change_last = [0, 0]
    # 设置价格区间
    g.band_width = update_band_width(context)
    g.band = [g.center + i * g.band_width for i in range(-3, 4)]
    # 定时运行,每分钟判断一次
    run_daily(cut_loss, time='every_bar')
    run_daily(check_grid, time='every_bar')
# 获取前一交易日的收盘价
def get_previous_close_price(context):
    # 获取前一个交易日的收盘价
    hist = get_price(security=g.symbol, end_date=context.current_dt.strftime("%Y-%m-%d %H:%M:%S"), frequency='1d',fields=['close'], count=1, fq='pre')

    print(hist)
    return hist['close'][0]

def update_band_width(context):
    hist = get_price(security=g.symbol, end_date=context.current_dt.strftime("%Y-%m-%d %H:%M:%S"), frequency='1d',
                     fields=['close'], count=60, fq='pre')
    std_dev = hist['close'].std()
    return round(std_dev, 2)

# 交易逻辑
def check_grid(context):
    # 获取当前价格
    current_price = get_price(security=g.symbol, end_date=context.current_dt.strftime("%Y-%m-%d %H:%M:%S"),
                              frequency='1m', fields=['close'], count=1, fq='pre')['close'][0]
    # 获取当前持仓
    update_cash_info()
    position = context.portfolio.positions[g.symbol].total_amount if g.symbol in context.portfolio.positions else 0
    # 判断当前价格所处的价格区间
    grid = pd.cut([current_price], g.band, labels=[-3, -2, -1, 1, 2, 3])[0]

    # 止损

    # 如果价格超出价格区间,重置区间相关参数
    if np.isnan(grid):
        log.info('价格波动超过价格区间,可适当调节价格区间宽度和数量')
        # 储存前一个所处区间
        g.last_grid = 0
        # 以前一交易日的收盘价为中枢价格
        g.center = get_previous_close_price(context)
        # 记录上一次交易时区间范围的变化情况
        g.grid_change_last = [0, 0]
        # 设置价格区间
        g.band_width = update_band_width(context)
        g.band = [g.center + i * g.band_width for i in range(-3, 4)]

        return

    # 如果新的价格所处区间和前一个价格所处的区间不同,说明触碰到了辅助线,需要进行交易
    # 如果新区间大于前区间,全部卖出
    if g.last_grid < grid:
        if g.last_grid == 0:
            g.last_grid = grid
            g.grid_change_last = [g.last_grid, grid]
            return
        if g.last_grid != 0 and position > 0:
            current_price = get_price(security=g.symbol, end_date=context.current_dt.strftime("%Y-%m-%d %H:%M:%S"),
                                      frequency='1m', fields=['close'], count=1, fq='pre')['close'][0]
            order_target(g.symbol, 0, MarketOrderStyle(current_price * 0.98))
            log.info('以市价单卖出{}股'.format(position))
            log.info('上一个grid:{},当前grid:{},当前价格:{}'.format(g.last_grid, grid, current_price))
            g.last_grid = grid
            g.grid_change_last = [g.last_grid, grid]
            update_cash_info()

    # 如果新区间小于当前区间,买入
    if g.last_grid > grid:
        if g.last_grid == 0:
            g.last_grid = grid
            g.grid_change_last = [g.last_grid, grid]
            return
        if g.last_grid != 0 and position == 0:
            current_price = get_price(security=g.symbol, end_date=context.current_dt.strftime("%Y-%m-%d %H:%M:%S"),
                                      frequency='1m', fields=['close'], count=1, fq='pre')['close'][0]
            order_target_value(g.symbol, context.portfolio.available_cash * 0.9, MarketOrderStyle(current_price * 1.02))
            log.info('以市价单买入{} 元'.format(context.portfolio.available_cash * 0.9))
            log.info('上一个grid:{},当前grid:{},当前价格:{}'.format(g.last_grid, grid, current_price))
            g.last_grid = grid
            g.grid_change_last = [g.last_grid, grid]
            update_cash_info()

def cut_loss(context):
    # 当最新价*90%大于成本价,止损
    avg_cost = context.portfolio.positions[g.symbol].avg_cost if g.symbol in context.portfolio.positions else 0
    price = context.portfolio.positions[g.symbol].price if g.symbol in context.portfolio.positions else 0

    if avg_cost < price * 0.9:
        position = context.portfolio.positions[g.symbol].total_amount if g.symbol in context.portfolio.positions else 0
        order_target(g.symbol, 0, MarketOrderStyle(price * 0.98))
        log.info('触发止损线,以市价单卖出{}股'.format(position))
        update_cash_info()

if __name__ == '__main__':
    run_main()

均值回复策略

from zltquant import *
from datetime import *

def initialize(context):
    # 设定基准
    set_benchmark('sh000300')

    # 输出内容到日志 log.info()
    log.info('初始函数开始运行且全局只运行一次')

    run_daily(after_market_close, time='15:30')
    run_monthly(adjust_position, time='10:30', monthday=1)

    g.observe_days = 20       #收益率观察交易日数
    g.target_value = 10000    #每只股票金额
    g.stock_number = 10       #单方向股票数量

# 调仓
def adjust_position(context):
    stocks = get_index_stocks('sh000300')
    stocks = filter_stocks(context, stocks)
    df = get_price(stocks, count=g.observe_days, end_date=context.current_dt, frequency='1d', fields=['close'],fq='pre',df=True)

    #按历史收益率排序
    grouped = df.groupby('security').agg({
        'close': ['first', 'last']
    }).reset_index()
    grouped.columns = ['code', 'start_price', 'end_price']
    grouped['return'] = (grouped['end_price'] - grouped['start_price']) / grouped['start_price']
    grouped_sorted = grouped.sort_values(by='return', ascending=True)

    bad_list = grouped_sorted['code'][:g.stock_number]
    good_list = grouped_sorted['code'][-g.stock_number:]

    log.info('表现差的股票:%s' % bad_list)
    log.info('表现好的股票:%s' % good_list)

    # 均值回复策略
    new_long_list = bad_list
    new_short_list = good_list

    # 当前的组合仓位信息
    portfolio = context.portfolio
    long_positions = list(portfolio.positions.keys())
    log.info("positions", long_positions)

    # 多头平仓
    for stock in long_positions:
        if stock not in new_long_list:
            amount = portfolio.positions[stock].closeable_amount
            log.info("多头平仓,股票:%s" % stock)
            order(stock, -amount)
            update_cash_info()

    # 多头开仓
    for stock in new_long_list:
        if stock not in long_positions:
            log.info("多头开仓,股票:%s" % stock)
            order_target_value(stock, g.target_value)
            update_cash_info()

#过滤各种股票
def filter_stocks(context, stock_list):
    filtered_stocks = []
    for stock in stock_list:
        stock_code = stock[2:]
        if stock_code.startswith('30') or stock_code.startswith('68') or stock_code.startswith('8') or stock_code.startswith('4'):  # 市场类型
            continue
        start_date = get_security_info(stock).start_date   
        if context.previous_date - start_date < timedelta(days=375):
            continue
        filtered_stocks.append(stock)
    return filtered_stocks

## 收盘后运行函数
def after_market_close(context):
    p = context.portfolio
    log.info('- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -')
    log.info('总资产:', p.total_value)
    log.info('##############################################################')

if __name__ == '__main__':
    run_main()

海龟交易法则

from zltquant import *
import pandas as pd
import datetime
import math
from collections import deque

def initialize(context):
    set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
    g.security = 'sh601012'
    set_benchmark(g.security)
    # 入市的trailing date
    g.in_day = 20
    # 出市的trailing date
    g.out_day = 10
    # 当前持仓的交易单位
    g.current_units = 0
    # 当天交易的交易单位
    g.today_units = 0
    g.current_units = 0
    g.N=deque(maxlen=19)
    g.current_N = 0
    g.last_buy_price = 0

    price = get_price(security=g.security, end_date=context.current_dt,count=g.N.maxlen*2+1,frequency= '1d',fields=['high', 'low', 'close'])
    print('ff',g.N.maxlen+1, g.N.maxlen*2+1)
    for i in range(g.N.maxlen+1, g.N.maxlen*2+1):
        li = []
        for j in range(i-19,i+1):
            a = price['high'][j]-price['low'][j]
            b = abs(price['high'][j]-price['close'][j-1])
            c = abs(price['low'][j]-price['close'][j-1])
            li.append(max(a,b,c))
        current_N = np.array(li).mean()
        g.N.append(current_N)

def before_trading_start(context):
    g.current_N = cal_N(context)
    g.today_units = 0

def handle_data(context):
    dt = context.current_dt
    data=get_current_data()
    current_price = data[g.security].last_price #上一分钟价格
    value = context.portfolio.total_value
    cash = context.portfolio.available_cash

    unit = math.floor(value * 0.01 / g.current_N)

    if g.current_units == 0:
        buy(context,current_price, cash, unit)
    else:
        if stop_loss(current_price):
            return
        if sell(current_price):
            return
        addin(current_price, cash, unit)

def cal_N(context):
    price = get_price(security=g.security, end_date=context.current_dt,count=2,frequency= '1d',fields=['high', 'low', 'close'])    
    a = price['high'][1]-price['low'][1]
    b = abs(price['high'][1]-price['close'][0])
    c = abs(price['low'][1]-price['close'][0])
    current_N = (max(a,b,c) + np.array(g.N).sum())/(g.N.maxlen+1)
    g.N.append(current_N)
    print('nnn',g.N)
    return current_N

def buy(context,current_price, cash, unit):
    price = get_price(security=g.security, end_date=context.current_dt, count=g.in_day,frequency= '1d',fields=['close'])    
    if current_price > max(price['close']):
        print('ppp',current_price ,price['close'])
        shares = cash / current_price
        if shares >= unit:
            print("buying %d" % unit)
            o = order(g.security, unit)
            o = get_orders()[o]
            g.last_buy_price = float(o.price)
            g.current_units += 1
            g.today_units += 1
            return True
    return False

def addin(current_price, cash, unit):
    if current_price >= g.last_buy_price + 0.5 * g.current_N:
        shares = cash / current_price
        if shares >= unit:
            print("adding %d" % unit)
            o = order(g.security, unit)
            o = get_orders()[o]
            g.last_buy_price = float(o.price)
            g.current_units += 1
            g.today_units += 1
            return True
    return False

def sell(current_price):
    price = attribute_history(g.security, g.out_day, '1d', ('close',))
    if current_price < min(price['close']):
        print("selling")
        order_target(g.security, 0)
        g.current_units = g.today_units
        return True
    return False

def stop_loss(current_price):
    if current_price < g.last_buy_price - 2 * g.current_N:
        print("stop loss")
        order_target(g.security, 0)
        g.current_units = g.today_units
        return True
    return False
if __name__ == '__main__':
    run_main()

小市值策略

# 小市值策略

from zltquant import *
import numpy as np
import pandas as pd
from datetime import time,date

def initialize(context):    
    # 设定基准    
    set_benchmark('sz399102')
    #初始化全局变量 bool    
    g.trading_signal = True  # 是否为可交易日    
    #全局变量list    
    g.hold_list = [] #当前持仓的全部股票        
    g.yesterday_HL_list = [] #记录持仓中昨日涨停的股票    
    g.target_list = []    
    g.limitup_stocks = []   # 记录涨停的股票避免再次买入    
    #全局变量float/str    
    g.min_mv = 10  # 股票最小市值要求    
    g.max_mv = 100  # 股票最大市值要求    
    g.stock_num = 4  # 持股数量    
    g.stoploss_list = []  # 止损卖出列表    
    g.other_sale    = []  # 其他卖出列表    
    g.highest = 50  # 股票单价上限设置    
    # 设置交易运行时间    
    run_daily(prepare_work, '9:05')    
    run_daily(trade_afternoon, time='14:20') #检查持仓中的涨停股是否需要卖出    
    run_weekly(weekly_adjustment,'09:31', 3) #每周三:31调仓

#准备工作
#每天早上9:05,更新昨日持仓股涨停股列表,更新今日是否可以开仓布尔变量
def prepare_work(context):    
    #获取已持有列表    
    g.limitup_stocks = []    
    g.hold_list = list(context.portfolio.positions)    
    #获取昨日涨停列表
    if g.hold_list:
        df = get_price(g.hold_list, end_date=context.current_dt, frequency='daily', fields=['close'], count=2, df=True, fill_suspend=False)
        for stock in g.hold_list:            
            if df.loc[df['security']==stock,'close'].iloc[0] >= df.loc[df['security']==stock,'close'].iloc[1] * 1.2: #昨日收盘价大于昨日收盘价的1.2倍,则为涨停股                
                g.yesterday_HL_list.append(stock)                
    else:        
        g.yesterday_HL_list = []

#选股模块
#选股,选出满足筛选条件的买入股票list
def get_stock_list(context):    
    final_list = []    
    MKT_index = 'sz399102'    
    initial_list = get_index_stocks(MKT_index)
    data = get_valuation_indicators(initial_list, ['market_cap'])  
    data = data[data['market_cap'] > g.min_mv * 100000000]  # 总市值大于10亿元
    data = data[data['market_cap'] < g.max_mv * 100000000]  # 总市值小于100亿元
    data = data.sort_values(by='market_cap', ascending=True)
    data = data.head(g.stock_num*3)
    stock_list = data['security'].tolist()
    data = get_fundamentals_pit(stock_list, ['is_parent_net_profit','is_net_profit','is_op_rev'])
    data = data[data['is_parent_net_profit'] > 0]
    data = data[data['is_net_profit'] > 0]
    data = data[data['is_op_rev'] > 1e8]
    final_list = data['security'].tolist()
    return final_list

#每周调仓函数   
def weekly_adjustment(context):    
    if g.trading_signal:#满足开仓条件        
        g.target_list = get_stock_list(context)[:g.stock_num]#应买的股票池        
        log.info(f"今日股票池:{str(g.target_list)}")        
        #首先确定哪些要卖,没在新预买池里的,没涨停的        
        sell_list = [stock for stock in g.hold_list if stock not in g.target_list and stock not in g.yesterday_HL_list]        
        #持仓表,就是要么涨停了不卖,要么就还在目标股池中        
        hold_list = [stock for stock in g.hold_list if stock in g.target_list or stock in g.yesterday_HL_list]        
        log.info("卖出[%s]" % (str(sell_list)))        
        log.info("已持有[%s]" % (str(hold_list)))                
        #先卖        
        for stock in sell_list:            
            order_target_value(stock, 0)        
        #后买
        buy_list = [stock for stock in g.target_list if stock not in g.hold_list]               
        buy_security(context, buy_list,len(buy_list))    
    else:        
        log.info('该月份为空仓月份')

#持仓涨停不卖出模块
def deal_up(context):    
    now_time = context.current_dt    
    if g.yesterday_HL_list != []:        
        #对昨日涨停股票观察到尾盘如不涨停则提前卖出,如果涨停即使不在应买入列表仍暂时持有       
        for stock in g.yesterday_HL_list:            
            current_data = get_price(stock, end_date=now_time, frequency='1m', fields=['close','high_limit'], skip_paused=False, fq='pre', count=1, df=True, fill_suspend=True)            
            if current_data.iloc[0,0] <    current_data.iloc[0,1]:#这两个是收盘价<最高价的意思  
                log.info("[%s]涨停打开,卖出" % (stock))                
                order_target_value(stock, 0)                
                g.other_sale.append(stock)                
                g.limitup_stocks.append(stock)            
            else:                
                log.info("[%s]涨停,继续持有" % (stock))

#下午检查交易,把持仓中涨停破板的卖出,继续涨停的继续持有
def trade_afternoon(context):    
    if g.trading_signal:#判断今日是否可以开仓        
        deal_up(context)  #涨停不卖出模块,把没涨停的持仓就卖了
#买入模块
def buy_security(context,target_list,num):    
    #调仓买入    
    position_count = len(context.portfolio.positions)    
    target_num = num    
    if target_num !=0:        
        value = context.portfolio.available_cash / target_num        
        for stock in target_list:            
            order_target_value(stock, value)            
            log.info("买入[%s](%s元)" % (stock,value))            
            if len(context.portfolio.positions) == g.stock_num:                
                break

if __name__ == '__main__':
    run_main()

股票配对交易

import scipy.stats as stats
import numpy as np
import pandas as pd
from pandas import DataFrame, Series
import statsmodels
from statsmodels.tsa.stattools import coint
from zltquant import *

def initialize(context):
    # 设定上证50作为基准
    set_benchmark('sh000016')
    # 交易量设置  开仓量不超过当前25%
    set_option('order_volume_ratio', 0.25)

    g.threadhold = 0.02        # 阈值
    g.before_high_stock = None    # 之前的股票
    g.before_low_stock = None    # 之前的股票

# 定义计算协整关系的函数
def find_cointegrated_pairs(securities_df, sec_codes):
    n = len(sec_codes)
    # score用来判断股价是否是非平稳的,代表对股价进行单位根检定的t统计量
    # score_matrix = np.zeros((n, n))
    # 原假设是股票间不存在协整关系,如果Pvalue<0.05则拒绝原假设
    pvalue_matrix = np.ones((n, n))
    # 循环,计算两两股票间的协整关系,并将结果保存在矩阵中
    for i in range(n):
        for j in range(i+1, n):
            S1 = securities_df[securities_df["security"] == sec_codes[i]]['close']
            S2 = securities_df[securities_df["security"] == sec_codes[j]]['close']
            result = coint(S1, S2)
            score = abs(result[0])
            pvalue = result[1]
            if score > 3:
                pvalue_matrix[i, j] = pvalue
            else:
                pvalue_matrix[i, j] = 1
    return pvalue_matrix

# 每个单位时间(如果按天回测,则每天调用一次,如果按分钟,则每分钟调用一次)调用一次
def handle_data(context):
    print(context.current_dt)
    # 取得上证50股票列表
    symbol = get_index_stocks('sh000016')
    symbol_list = []
    for i in range(0, len(symbol)):
        symbol_list.append(symbol[i])

    # 取出上证50股票的价格
    securities_panel = get_price(
        symbol_list, count=100, end_date=context.current_dt, fields=['close'],fq=None, df=True)
    # 取得pvalue矩阵
    pvalues = find_cointegrated_pairs(securities_panel, symbol_list)
    # 找出矩阵pvalue的对角线以上的部分最小值得坐标
    # 由于对称关系,只需找对角线以上的就足够了

    index = (0, 0)
    m = 1
    num = len(symbol)
    for i in range(0, num):
        for j in range(i+1, num):
            if pvalues[i, j] < m:
                index = (i, j)
                m = pvalues[i, j]

    # 找出坐标后,根据坐标设置需要操作的两只股票
    g.security_high = symbol[index[0]]
    g.security_low = symbol[index[1]]

    set_universe([g.security_high, g.security_low])

    # 取出两只股票的价格
    hist_high = get_price(g.security_high, count=100, end_date=context.current_dt,
                          fields=None,fq=None)['close']
    hist_low = get_price(g.security_low, count=100, end_date=context.current_dt,
                         fields=None,fq=None)['close']

    # 计算价值中枢(回测时间内股价比值的均值)
    g.mean = np.mean(hist_high / hist_low)

    data = get_current_data()
    price_high = data[g.security_high].last_price
    price_low = data[g.security_low].last_price

    # 两只股票的股价比值
    price_ration = price_high / price_low

    # 记录换股:
    log.info('换股前', g.before_high_stock, g.security_high,
             g.before_low_stock, g.security_low)

    # 记录换股:
    log.info('换股后', g.before_high_stock, g.security_high,
             g.before_low_stock, g.security_low)

    # 当两股的股价比值超出价值中枢2%(阈值)时,执行卖出估价高的(security_high),买入股价低的(security_low)
    if price_ration > (1 + g.threadhold) * g.mean:
        # 卖出股价高的(security_high)买入股价低的(security_low)
        # 判断选股有没有变动
        if g.before_high_stock is None:
            g.before_high_stock = g.security_high
        else:
            if g.before_high_stock != g.security_high:
                if g.before_high_stock in context.portfolio.positions:
                    order_target(g.before_high_stock, 0)
                    cash = context.portfolio.available_cash
                    number_of_shares = int(cash/price_high)
                    # 下买入单
                    order_value(g.security_high, cash)
                    g.before_high_stock = g.security_high

        # 判断选股有没有变动
        if g.before_low_stock is None:
            g.before_low_stock = g.security_low
        else:
            if g.before_low_stock != g.security_low:
                if g.before_low_stock in context.portfolio.positions:
                    order_target(g.before_low_stock, 0)
                    cash = context.portfolio.available_cash
                    number_of_shares = int(cash/price_low)
                    # 下买入单
                    order_value(g.security_low, cash)
                    g.before_low_stock = g.security_low

        # 卖出股价高的
        if context.portfolio.positions[g.security_high].total_amount > 0:
            # 全部卖出股价高的
            order_target(g.security_high, 0)

        cash = context.portfolio.available_cash
        # 买入股价低的
        if cash > price_low * 100:
            # 得到能够购买的股数
            number_of_shares = int(cash/price_low)
            # 下买入单
            order_value(g.security_low, cash)

    # 当两股的股价比值低于价值中枢2%(阈值)时,执行卖出估价低的(security_low),买入股价高的(security_high)
    elif price_ration < (1-g.threadhold)*g.mean:
        # 判断选股有没有变动
        if g.before_high_stock is None:
            g.before_high_stock = g.security_high
        else:
            if g.before_high_stock != g.security_high:
                if g.before_high_stock in context.portfolio.positions:
                    order_target(g.before_high_stock, 0)
                    cash = context.portfolio.available_cash
                    number_of_shares = int(cash/price_high)
                    # 下买入单
                    order_value(g.security_high, cash)
                    g.before_high_stock = g.security_high

        # 判断选股有没有变动
        if g.before_low_stock is None:
            g.before_low_stock = g.security_low
        else:
            if g.before_low_stock != g.security_low:
                if g.before_low_stock in context.portfolio.positions:
                    order_target(g.before_low_stock, 0)
                    cash = context.portfolio.available_cash
                    number_of_shares = int(cash/price_low)
                    # 下买入单
                    order_value(g.security_low, cash)
                    g.before_low_stock = g.security_low

        # 卖出股价低的(security_low)买入股价高的(security_high)
        # 卖出股价低的
        if context.portfolio.positions[g.security_low].total_amount > 0:
            # 全部卖出股价低的
            order_target(g.security_low, 0)

        # 买入股价高的
        cash = context.portfolio.available_cash
        if cash > price_high*100:
            # 下买入单
            order_value(g.security_high, cash)

if __name__ == '__main__':
    run_main()

results matching ""

    No results matching ""