示例策略
默认模板(框架)
"""
默认策略
示例策略仅供参考,不建议直接实盘使用。
"""
from zltquant import *
def initialize(context):
set_benchmark('sh000300')
print('策略开始运行,初始化函数全局只运行一次')
set_order_cost(OrderCost(open_tax=0, close_tax=0.001, open_commission=0.0003, close_commission=0.0003, close_today_commission=0, min_commission=5), type='stock')
set_slippage(FixedSlippage(0.02))
g.security = []
def before_trading(context):
date = context.current_dt.strftime('%Y-%m-%d %H:%M:%S')
print('{} 盘前运行'.format(date))
def handle_data(context):
time = context.current_dt.strftime('%Y-%m-%d %H:%M:%S')
print('{} 盘中运行'.format(time))
def after_trading(context):
time = context.current_dt.strftime('%Y-%m-%d %H:%M:%S')
print('{} 盘后运行'.format(time))
print('一天结束')
if __name__ == '__main__':
run_main()
均线择时
import pandas as pd
from zltquant import *
def initialize(context):
g.security = 'sz000001'
def handle_data(context):
short = 12
long = 26
mid = 9
bar = get_price(g.security, end_date=context.current_dt, count=long + mid, frequency='1d', fq='pre', fill_suspend=True)
close = pd.Series(bar['close'])
ema_short = close.ewm(span=short, adjust=False).mean()
ema_long = close.ewm(span=long, adjust=False).mean()
diff = ema_short - ema_long
dea = diff.ewm(span=mid, adjust=False).mean()
macd = (diff - dea) * 2
update_cash_info()
if diff.iloc[-1] > dea.iloc[-1] and macd.iloc[-1] > macd.iloc[-2] and g.security not in context.portfolio.positions.keys():
order_value(g.security, context.portfolio.available_cash)
elif diff.iloc[-1] < dea.iloc[-1] and macd.iloc[-1] < macd.iloc[-2] and g.security in context.portfolio.positions.keys():
order_target(g.security, 0)
if __name__ == '__main__':
run_main()
布林线均值回归
"""
- 当股票价格触及布林线上轨且有持仓时, 卖出股票.
- 当股票价格触及布林线下轨且无持仓时, 买入股票.
"""
from zltquant import *
def initialize(context):
set_benchmark('sh000300')
g.stock = 'sh600000'
g.window = 20
g.num_std = 2
run_daily(trade, time='09:31')
def trade(context):
dbar1 = get_price(g.stock, end_date=context.current_dt, count=g.window, fq="pre")
stock_data = pd.Series(dbar1['close'])
mean = stock_data.rolling(window=g.window).mean().iloc[-1]
std = stock_data.rolling(window=g.window).std().iloc[-1]
upper_band = mean + g.num_std * std
lower_band = mean - g.num_std * std
current_price = stock_data.iloc[-1]
current_position = context.portfolio.positions[g.stock].total_amount
if current_price > upper_band and current_position > 0:
order_target(g.stock, 0)
log.info("卖出股票:%s" % g.stock)
elif current_price < lower_band and current_position == 0:
order_value(g.stock, context.portfolio.available_cash)
log.info("买入股票:%s" % g.stock)
update_cash_info()
def after_trading_end(context):
log.info("每日收盘后运行")
pass
if __name__ == '__main__':
run_main()
多因子选股
from zltquant import *
import datetime
def initialize(context):
run_daily(select_and_trade, time='every_bar')
def select_and_trade(context):
selected_stocks = select_stocks(context)
buy_list, sell_list = update_order_list(context,selected_stocks)
trade(context, buy_list, sell_list)
def select_stocks(context):
stock_list = get_index_stocks('sz399006',date=context.current_dt)
selected_stocks = []
for stock in stock_list:
hist = get_price(stock, end_date=context.current_dt, frequency='1d', fields=['close'], count=60, fq='pre',df=True)
std_dev = hist['close'].std()
mean_price = hist['close'].mean()
normalized_volatility = std_dev / mean_price
short_ma = hist['close'].rolling(window=5).mean().iloc[-1]
long_ma = hist['close'].rolling(window=20).mean().iloc[-1]
hist_volume = get_price(stock, end_date=context.current_dt, frequency='1d', fields=['money'], count=20, fq='pre',df=True)
avg_turnover = hist_volume['money'].mean()
if normalized_volatility > 0.2 and short_ma > long_ma and avg_turnover > 100000000:
selected_stocks.append(stock)
log.info('选中的股票列表:{}'.format(selected_stocks))
return selected_stocks
def update_order_list(context,selected_stocks):
buy_list = []
sell_list = []
long_positions = context.portfolio.positions
for stock in long_positions:
if stock not in selected_stocks:
sell_list.append(stock)
for stock in selected_stocks:
if stock not in long_positions:
buy_list.append(stock)
return buy_list, sell_list
def trade(context, buy_list, sell_list):
for stock in sell_list:
order_target(stock, 0)
update_cash_info()
log.info('卖出股票:{}'.format(stock))
if len(buy_list) != 0:
cash = context.portfolio.available_cash / len(buy_list)
for stock in buy_list:
order_target_value(stock, cash)
update_cash_info()
log.info('买入股票:{}'.format(stock))
if __name__ == '__main__':
run_main()
趋势跟踪
import pandas as pd
from zltquant import *
import numpy as np
def initialize(context):
g.symbol = 'sz000001'
g.last_grid = 0
g.center = get_previous_close_price(context)
g.grid_change_last = [0, 0]
g.band_width = update_band_width(context)
g.band = [g.center + i * g.band_width for i in range(-3, 4)]
run_daily(cut_loss, time='every_bar')
run_daily(check_grid, time='every_bar')
def get_previous_close_price(context):
hist = get_price(security=g.symbol, end_date=context.current_dt.strftime("%Y-%m-%d %H:%M:%S"), frequency='1d',fields=['close'], count=1, fq='pre')
print(hist)
return hist['close'][0]
def update_band_width(context):
hist = get_price(security=g.symbol, end_date=context.current_dt.strftime("%Y-%m-%d %H:%M:%S"), frequency='1d',
fields=['close'], count=60, fq='pre')
std_dev = hist['close'].std()
return round(std_dev, 2)
def check_grid(context):
current_price = get_price(security=g.symbol, end_date=context.current_dt.strftime("%Y-%m-%d %H:%M:%S"),
frequency='1m', fields=['close'], count=1, fq='pre')['close'][0]
update_cash_info()
position = context.portfolio.positions[g.symbol].total_amount if g.symbol in context.portfolio.positions else 0
grid = pd.cut([current_price], g.band, labels=[-3, -2, -1, 1, 2, 3])[0]
if np.isnan(grid):
log.info('价格波动超过价格区间,可适当调节价格区间宽度和数量')
g.last_grid = 0
g.center = get_previous_close_price(context)
g.grid_change_last = [0, 0]
g.band_width = update_band_width(context)
g.band = [g.center + i * g.band_width for i in range(-3, 4)]
return
if g.last_grid < grid:
if g.last_grid == 0:
g.last_grid = grid
g.grid_change_last = [g.last_grid, grid]
return
if g.last_grid != 0 and position > 0:
current_price = get_price(security=g.symbol, end_date=context.current_dt.strftime("%Y-%m-%d %H:%M:%S"),
frequency='1m', fields=['close'], count=1, fq='pre')['close'][0]
order_target(g.symbol, 0, MarketOrderStyle(current_price * 0.98))
log.info('以市价单卖出{}股'.format(position))
log.info('上一个grid:{},当前grid:{},当前价格:{}'.format(g.last_grid, grid, current_price))
g.last_grid = grid
g.grid_change_last = [g.last_grid, grid]
update_cash_info()
if g.last_grid > grid:
if g.last_grid == 0:
g.last_grid = grid
g.grid_change_last = [g.last_grid, grid]
return
if g.last_grid != 0 and position == 0:
current_price = get_price(security=g.symbol, end_date=context.current_dt.strftime("%Y-%m-%d %H:%M:%S"),
frequency='1m', fields=['close'], count=1, fq='pre')['close'][0]
order_target_value(g.symbol, context.portfolio.available_cash * 0.9, MarketOrderStyle(current_price * 1.02))
log.info('以市价单买入{} 元'.format(context.portfolio.available_cash * 0.9))
log.info('上一个grid:{},当前grid:{},当前价格:{}'.format(g.last_grid, grid, current_price))
g.last_grid = grid
g.grid_change_last = [g.last_grid, grid]
update_cash_info()
def cut_loss(context):
avg_cost = context.portfolio.positions[g.symbol].avg_cost if g.symbol in context.portfolio.positions else 0
price = context.portfolio.positions[g.symbol].price if g.symbol in context.portfolio.positions else 0
if avg_cost < price * 0.9:
position = context.portfolio.positions[g.symbol].total_amount if g.symbol in context.portfolio.positions else 0
order_target(g.symbol, 0, MarketOrderStyle(price * 0.98))
log.info('触发止损线,以市价单卖出{}股'.format(position))
update_cash_info()
if __name__ == '__main__':
run_main()
均值回复策略
from zltquant import *
from datetime import *
def initialize(context):
set_benchmark('sh000300')
log.info('初始函数开始运行且全局只运行一次')
run_daily(after_market_close, time='15:30')
run_monthly(adjust_position, time='10:30', monthday=1)
g.observe_days = 20
g.target_value = 10000
g.stock_number = 10
def adjust_position(context):
stocks = get_index_stocks('sh000300')
stocks = filter_stocks(context, stocks)
df = get_price(stocks, count=g.observe_days, end_date=context.current_dt, frequency='1d', fields=['close'],fq='pre',df=True)
grouped = df.groupby('security').agg({
'close': ['first', 'last']
}).reset_index()
grouped.columns = ['code', 'start_price', 'end_price']
grouped['return'] = (grouped['end_price'] - grouped['start_price']) / grouped['start_price']
grouped_sorted = grouped.sort_values(by='return', ascending=True)
bad_list = grouped_sorted['code'][:g.stock_number]
good_list = grouped_sorted['code'][-g.stock_number:]
log.info('表现差的股票:%s' % bad_list)
log.info('表现好的股票:%s' % good_list)
new_long_list = bad_list
new_short_list = good_list
portfolio = context.portfolio
long_positions = list(portfolio.positions.keys())
log.info("positions", long_positions)
for stock in long_positions:
if stock not in new_long_list:
amount = portfolio.positions[stock].closeable_amount
log.info("多头平仓,股票:%s" % stock)
order(stock, -amount)
update_cash_info()
for stock in new_long_list:
if stock not in long_positions:
log.info("多头开仓,股票:%s" % stock)
order_target_value(stock, g.target_value)
update_cash_info()
def filter_stocks(context, stock_list):
filtered_stocks = []
for stock in stock_list:
stock_code = stock[2:]
if stock_code.startswith('30') or stock_code.startswith('68') or stock_code.startswith('8') or stock_code.startswith('4'):
continue
start_date = get_security_info(stock).start_date
if context.previous_date - start_date < timedelta(days=375):
continue
filtered_stocks.append(stock)
return filtered_stocks
def after_market_close(context):
p = context.portfolio
log.info('- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -')
log.info('总资产:', p.total_value)
log.info('##############################################################')
if __name__ == '__main__':
run_main()
海龟交易法则
from zltquant import *
import pandas as pd
import datetime
import math
from collections import deque
def initialize(context):
set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
g.security = 'sh601012'
set_benchmark(g.security)
g.in_day = 20
g.out_day = 10
g.current_units = 0
g.today_units = 0
g.current_units = 0
g.N=deque(maxlen=19)
g.current_N = 0
g.last_buy_price = 0
price = get_price(security=g.security, end_date=context.current_dt,count=g.N.maxlen*2+1,frequency= '1d',fields=['high', 'low', 'close'])
print('ff',g.N.maxlen+1, g.N.maxlen*2+1)
for i in range(g.N.maxlen+1, g.N.maxlen*2+1):
li = []
for j in range(i-19,i+1):
a = price['high'][j]-price['low'][j]
b = abs(price['high'][j]-price['close'][j-1])
c = abs(price['low'][j]-price['close'][j-1])
li.append(max(a,b,c))
current_N = np.array(li).mean()
g.N.append(current_N)
def before_trading_start(context):
g.current_N = cal_N(context)
g.today_units = 0
def handle_data(context):
dt = context.current_dt
data=get_current_data()
current_price = data[g.security].last_price
value = context.portfolio.total_value
cash = context.portfolio.available_cash
unit = math.floor(value * 0.01 / g.current_N)
if g.current_units == 0:
buy(context,current_price, cash, unit)
else:
if stop_loss(current_price):
return
if sell(current_price):
return
addin(current_price, cash, unit)
def cal_N(context):
price = get_price(security=g.security, end_date=context.current_dt,count=2,frequency= '1d',fields=['high', 'low', 'close'])
a = price['high'][1]-price['low'][1]
b = abs(price['high'][1]-price['close'][0])
c = abs(price['low'][1]-price['close'][0])
current_N = (max(a,b,c) + np.array(g.N).sum())/(g.N.maxlen+1)
g.N.append(current_N)
print('nnn',g.N)
return current_N
def buy(context,current_price, cash, unit):
price = get_price(security=g.security, end_date=context.current_dt, count=g.in_day,frequency= '1d',fields=['close'])
if current_price > max(price['close']):
print('ppp',current_price ,price['close'])
shares = cash / current_price
if shares >= unit:
print("buying %d" % unit)
o = order(g.security, unit)
o = get_orders()[o]
g.last_buy_price = float(o.price)
g.current_units += 1
g.today_units += 1
return True
return False
def addin(current_price, cash, unit):
if current_price >= g.last_buy_price + 0.5 * g.current_N:
shares = cash / current_price
if shares >= unit:
print("adding %d" % unit)
o = order(g.security, unit)
o = get_orders()[o]
g.last_buy_price = float(o.price)
g.current_units += 1
g.today_units += 1
return True
return False
def sell(current_price):
price = attribute_history(g.security, g.out_day, '1d', ('close',))
if current_price < min(price['close']):
print("selling")
order_target(g.security, 0)
g.current_units = g.today_units
return True
return False
def stop_loss(current_price):
if current_price < g.last_buy_price - 2 * g.current_N:
print("stop loss")
order_target(g.security, 0)
g.current_units = g.today_units
return True
return False
if __name__ == '__main__':
run_main()
小市值策略
from zltquant import *
import numpy as np
import pandas as pd
from datetime import time,date
def initialize(context):
set_benchmark('sz399102')
g.trading_signal = True
g.hold_list = []
g.yesterday_HL_list = []
g.target_list = []
g.limitup_stocks = []
g.min_mv = 10
g.max_mv = 100
g.stock_num = 4
g.stoploss_list = []
g.other_sale = []
g.highest = 50
run_daily(prepare_work, '9:05')
run_daily(trade_afternoon, time='14:20')
run_weekly(weekly_adjustment,'09:31', 3)
def prepare_work(context):
g.limitup_stocks = []
g.hold_list = list(context.portfolio.positions)
if g.hold_list:
df = get_price(g.hold_list, end_date=context.current_dt, frequency='daily', fields=['close'], count=2, df=True, fill_suspend=False)
for stock in g.hold_list:
if df.loc[df['security']==stock,'close'].iloc[0] >= df.loc[df['security']==stock,'close'].iloc[1] * 1.2:
g.yesterday_HL_list.append(stock)
else:
g.yesterday_HL_list = []
def get_stock_list(context):
final_list = []
MKT_index = 'sz399102'
initial_list = get_index_stocks(MKT_index)
data = get_valuation_indicators(initial_list, ['market_cap'])
data = data[data['market_cap'] > g.min_mv * 100000000]
data = data[data['market_cap'] < g.max_mv * 100000000]
data = data.sort_values(by='market_cap', ascending=True)
data = data.head(g.stock_num*3)
stock_list = data['security'].tolist()
data = get_fundamentals_pit(stock_list, ['is_parent_net_profit','is_net_profit','is_op_rev'])
data = data[data['is_parent_net_profit'] > 0]
data = data[data['is_net_profit'] > 0]
data = data[data['is_op_rev'] > 1e8]
final_list = data['security'].tolist()
return final_list
def weekly_adjustment(context):
if g.trading_signal:
g.target_list = get_stock_list(context)[:g.stock_num]
log.info(f"今日股票池:{str(g.target_list)}")
sell_list = [stock for stock in g.hold_list if stock not in g.target_list and stock not in g.yesterday_HL_list]
hold_list = [stock for stock in g.hold_list if stock in g.target_list or stock in g.yesterday_HL_list]
log.info("卖出[%s]" % (str(sell_list)))
log.info("已持有[%s]" % (str(hold_list)))
for stock in sell_list:
order_target_value(stock, 0)
buy_list = [stock for stock in g.target_list if stock not in g.hold_list]
buy_security(context, buy_list,len(buy_list))
else:
log.info('该月份为空仓月份')
def deal_up(context):
now_time = context.current_dt
if g.yesterday_HL_list != []:
for stock in g.yesterday_HL_list:
current_data = get_price(stock, end_date=now_time, frequency='1m', fields=['close','high_limit'], skip_paused=False, fq='pre', count=1, df=True, fill_suspend=True)
if current_data.iloc[0,0] < current_data.iloc[0,1]:
log.info("[%s]涨停打开,卖出" % (stock))
order_target_value(stock, 0)
g.other_sale.append(stock)
g.limitup_stocks.append(stock)
else:
log.info("[%s]涨停,继续持有" % (stock))
def trade_afternoon(context):
if g.trading_signal:
deal_up(context)
def buy_security(context,target_list,num):
position_count = len(context.portfolio.positions)
target_num = num
if target_num !=0:
value = context.portfolio.available_cash / target_num
for stock in target_list:
order_target_value(stock, value)
log.info("买入[%s](%s元)" % (stock,value))
if len(context.portfolio.positions) == g.stock_num:
break
if __name__ == '__main__':
run_main()
股票配对交易
import scipy.stats as stats
import numpy as np
import pandas as pd
from pandas import DataFrame, Series
import statsmodels
from statsmodels.tsa.stattools import coint
from zltquant import *
def initialize(context):
# 设定上证50作为基准
set_benchmark('sh000016')
# 交易量设置 开仓量不超过当前25%
set_option('order_volume_ratio', 0.25)
g.threadhold = 0.02 # 阈值
g.before_high_stock = None # 之前的股票
g.before_low_stock = None # 之前的股票
# 定义计算协整关系的函数
def find_cointegrated_pairs(securities_df, sec_codes):
n = len(sec_codes)
# score用来判断股价是否是非平稳的,代表对股价进行单位根检定的t统计量
# score_matrix = np.zeros((n, n))
# 原假设是股票间不存在协整关系,如果Pvalue<0.05则拒绝原假设
pvalue_matrix = np.ones((n, n))
# 循环,计算两两股票间的协整关系,并将结果保存在矩阵中
for i in range(n):
for j in range(i+1, n):
S1 = securities_df[securities_df["security"] == sec_codes[i]]['close']
S2 = securities_df[securities_df["security"] == sec_codes[j]]['close']
result = coint(S1, S2)
score = abs(result[0])
pvalue = result[1]
if score > 3:
pvalue_matrix[i, j] = pvalue
else:
pvalue_matrix[i, j] = 1
return pvalue_matrix
# 每个单位时间(如果按天回测,则每天调用一次,如果按分钟,则每分钟调用一次)调用一次
def handle_data(context):
print(context.current_dt)
# 取得上证50股票列表
symbol = get_index_stocks('sh000016')
symbol_list = []
for i in range(0, len(symbol)):
symbol_list.append(symbol[i])
# 取出上证50股票的价格
securities_panel = get_price(
symbol_list, count=100, end_date=context.current_dt, fields=['close'],fq=None, df=True)
# 取得pvalue矩阵
pvalues = find_cointegrated_pairs(securities_panel, symbol_list)
# 找出矩阵pvalue的对角线以上的部分最小值得坐标
# 由于对称关系,只需找对角线以上的就足够了
index = (0, 0)
m = 1
num = len(symbol)
for i in range(0, num):
for j in range(i+1, num):
if pvalues[i, j] < m:
index = (i, j)
m = pvalues[i, j]
# 找出坐标后,根据坐标设置需要操作的两只股票
g.security_high = symbol[index[0]]
g.security_low = symbol[index[1]]
set_universe([g.security_high, g.security_low])
# 取出两只股票的价格
hist_high = get_price(g.security_high, count=100, end_date=context.current_dt,
fields=None,fq=None)['close']
hist_low = get_price(g.security_low, count=100, end_date=context.current_dt,
fields=None,fq=None)['close']
# 计算价值中枢(回测时间内股价比值的均值)
g.mean = np.mean(hist_high / hist_low)
data = get_current_data()
price_high = data[g.security_high].last_price
price_low = data[g.security_low].last_price
# 两只股票的股价比值
price_ration = price_high / price_low
# 记录换股:
log.info('换股前', g.before_high_stock, g.security_high,
g.before_low_stock, g.security_low)
# 记录换股:
log.info('换股后', g.before_high_stock, g.security_high,
g.before_low_stock, g.security_low)
# 当两股的股价比值超出价值中枢2%(阈值)时,执行卖出估价高的(security_high),买入股价低的(security_low)
if price_ration > (1 + g.threadhold) * g.mean:
# 卖出股价高的(security_high)买入股价低的(security_low)
# 判断选股有没有变动
if g.before_high_stock is None:
g.before_high_stock = g.security_high
else:
if g.before_high_stock != g.security_high:
if g.before_high_stock in context.portfolio.positions:
order_target(g.before_high_stock, 0)
cash = context.portfolio.available_cash
number_of_shares = int(cash/price_high)
# 下买入单
order_value(g.security_high, cash)
g.before_high_stock = g.security_high
# 判断选股有没有变动
if g.before_low_stock is None:
g.before_low_stock = g.security_low
else:
if g.before_low_stock != g.security_low:
if g.before_low_stock in context.portfolio.positions:
order_target(g.before_low_stock, 0)
cash = context.portfolio.available_cash
number_of_shares = int(cash/price_low)
# 下买入单
order_value(g.security_low, cash)
g.before_low_stock = g.security_low
# 卖出股价高的
if context.portfolio.positions[g.security_high].total_amount > 0:
# 全部卖出股价高的
order_target(g.security_high, 0)
cash = context.portfolio.available_cash
# 买入股价低的
if cash > price_low * 100:
# 得到能够购买的股数
number_of_shares = int(cash/price_low)
# 下买入单
order_value(g.security_low, cash)
# 当两股的股价比值低于价值中枢2%(阈值)时,执行卖出估价低的(security_low),买入股价高的(security_high)
elif price_ration < (1-g.threadhold)*g.mean:
# 判断选股有没有变动
if g.before_high_stock is None:
g.before_high_stock = g.security_high
else:
if g.before_high_stock != g.security_high:
if g.before_high_stock in context.portfolio.positions:
order_target(g.before_high_stock, 0)
cash = context.portfolio.available_cash
number_of_shares = int(cash/price_high)
# 下买入单
order_value(g.security_high, cash)
g.before_high_stock = g.security_high
# 判断选股有没有变动
if g.before_low_stock is None:
g.before_low_stock = g.security_low
else:
if g.before_low_stock != g.security_low:
if g.before_low_stock in context.portfolio.positions:
order_target(g.before_low_stock, 0)
cash = context.portfolio.available_cash
number_of_shares = int(cash/price_low)
# 下买入单
order_value(g.security_low, cash)
g.before_low_stock = g.security_low
# 卖出股价低的(security_low)买入股价高的(security_high)
# 卖出股价低的
if context.portfolio.positions[g.security_low].total_amount > 0:
# 全部卖出股价低的
order_target(g.security_low, 0)
# 买入股价高的
cash = context.portfolio.available_cash
if cash > price_high*100:
# 下买入单
order_value(g.security_high, cash)
if __name__ == '__main__':
run_main()