python的zigzag实现及金融技术指标分析库

python

金融指标技术分析库:  

https://github.com/twopirllc/pandas-ta

zigzag的实现:

"""

reference:

https://github.com/jbn/ZigZag.git

"""

import numpy as np

PEAK = 1

VALLEY = -1

def identify_initial_pivot(X, up_thresh, down_thresh):

x_0 = X[0]

x_t = x_0

max_x = x_0

min_x = x_0

max_t = 0

min_t = 0

up_thresh += 1

down_thresh += 1

for t in range(1, len(X)):

x_t = X[t]

if x_t / min_x >= up_thresh:

return VALLEY if min_t == 0 else PEAK

if x_t / max_x <= down_thresh:

return PEAK if max_t == 0 else VALLEY

if x_t > max_x:

max_x = x_t

max_t = t

if x_t < min_x:

min_x = x_t

min_t = t

t_n = len(X)-1

return VALLEY if x_0 < X[t_n] else PEAK

def peak_valley_pivots(X, up_thresh, down_thresh):

"""

Find the peaks and valleys of a series.

:param X: the series to analyze

:param up_thresh: minimum relative change necessary to define a peak

:param down_thesh: minimum relative change necessary to define a valley

:return: an array with 0 indicating no pivot and -1 and 1 indicating

valley and peak

The First and Last Elements

---------------------------

The first and last elements are guaranteed to be annotated as peak or

valley even if the segments formed do not have the necessary relative

changes. This is a tradeoff between technical correctness and the

propensity to make mistakes in data analysis. The possible mistake is

ignoring data outside the fully realized segments, which may bias

analysis.

"""

if down_thresh > 0:

raise ValueError('The down_thresh must be negative.')

initial_pivot = identify_initial_pivot(X, up_thresh, down_thresh)

t_n = len(X)

pivots = np.zeros(t_n, dtype=np.int_)

trend = -initial_pivot

last_pivot_t = 0

last_pivot_x = X[0]

pivots[0] = initial_pivot

# Adding one to the relative change thresholds saves operations. Instead

# of computing relative change at each point as x_j / x_i - 1, it is

# computed as x_j / x_1. Then, this value is compared to the threshold + 1.

# This saves (t_n - 1) subtractions.

up_thresh += 1

down_thresh += 1

for t in range(1, t_n):

x = X[t]

r = x / last_pivot_x

if trend == -1:

if r >= up_thresh:

pivots[last_pivot_t] = trend

trend = PEAK

last_pivot_x = x

last_pivot_t = t

elif x < last_pivot_x:

last_pivot_x = x

last_pivot_t = t

else:

if r <= down_thresh:

pivots[last_pivot_t] = trend

trend = VALLEY

last_pivot_x = x

last_pivot_t = t

elif x > last_pivot_x:

last_pivot_x = x

last_pivot_t = t

if last_pivot_t == t_n-1:

pivots[last_pivot_t] = trend

elif pivots[t_n-1] == 0:

pivots[t_n-1] = -trend

return pivots

def max_drawdown(X):

"""

Compute the maximum drawdown of some sequence.

:return: 0 if the sequence is strictly increasing.

otherwise the abs value of the maximum drawdown

of sequence X

"""

mdd = 0

peak = X[0]

for x in X:

if x > peak:

peak = x

dd = (peak - x) / peak

if dd > mdd:

mdd = dd

return mdd if mdd != 0.0 else 0.0

def pivots_to_modes(pivots):

"""

Translate pivots into trend modes.

:param pivots: the result of calling ``peak_valley_pivots``

:return: numpy array of trend modes. That is, between (VALLEY, PEAK] it

is 1 and between (PEAK, VALLEY] it is -1.

"""

modes = np.zeros(len(pivots), dtype=np.int_)

mode = -pivots[0]

modes[0] = pivots[0]

for t in range(1, len(pivots)):

x = pivots[t]

if x != 0:

modes[t] = mode

mode = -x

else:

modes[t] = mode

return modes

def compute_segment_returns(X, pivots):

"""

:return: numpy array of the pivot-to-pivot returns for each segment."""

pivot_points = X[pivots != 0]

return pivot_points[1:] / pivot_points[:-1] - 1.0

使用示例:

import matplotlib

matplotlib.use("TkAgg")

import matplotlib.pyplot as plt

import numpy as np

import pandas as pd

import sys

import pathlib

sys.path.append("%s/zigzag" % pathlib.Path().absolute())

from zigzag import zigzag

def plot_pivots(X, pivots):

plt.xlim(0, len(X))

plt.ylim(X.min()*0.99, X.max()*1.01)

plt.plot(np.arange(len(X)), X, 'k:', alpha=0.5)

plt.plot(np.arange(len(X))[pivots != 0], X[pivots != 0], 'k-')

plt.scatter(np.arange(len(X))[pivots == 1], X[pivots == 1], color='g')

plt.scatter(np.arange(len(X))[pivots == -1], X[pivots == -1], color='r')

np.random.seed(1997)

X = np.cumprod(1 + np.random.randn(100) * 0.01)

pivots = zigzag.peak_valley_pivots(X, 0.03, -0.03)

plot_pivots(X, pivots)

plt.show()

modes = zigzag.pivots_to_modes(pivots)

print(pd.Series(X).pct_change().groupby(modes).describe().unstack())

print(zigzag.compute_segment_returns(X, pivots))

pandas 的数据输入示例:

from pandas_datareader import get_data_yahoo

X = get_data_yahoo('GOOG')['Adj Close']

pivots = peak_valley_pivots(X.values, 0.2, -0.2)

ts_pivots = pd.Series(X, index=X.index)

ts_pivots = ts_pivots[pivots != 0]

X.plot()

ts_pivots.plot(style='g-o');

以上是 python的zigzag实现及金融技术指标分析库 的全部内容, 来源链接: utcz.com/z/387143.html

回到顶部