|
|
|
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
try: |
|
|
import talib |
|
|
TALIB_AVAILABLE = True |
|
|
except: |
|
|
TALIB_AVAILABLE = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calc_macd(df): |
|
|
if TALIB_AVAILABLE: |
|
|
macd, signal, hist = talib.MACD(df["Close"]) |
|
|
else: |
|
|
ema12 = df["Close"].ewm(span=12).mean() |
|
|
ema26 = df["Close"].ewm(span=26).mean() |
|
|
macd = ema12 - ema26 |
|
|
signal = macd.ewm(span=9).mean() |
|
|
hist = macd - signal |
|
|
|
|
|
return pd.DataFrame({ |
|
|
"MACD": macd, |
|
|
"Signal": signal, |
|
|
"Histogram": hist |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calc_rsi(df): |
|
|
if TALIB_AVAILABLE: |
|
|
rsi = talib.RSI(df["Close"], timeperiod=14) |
|
|
else: |
|
|
delta = df["Close"].diff() |
|
|
gain = (delta.where(delta > 0, 0)).rolling(14).mean() |
|
|
loss = (-delta.where(delta < 0, 0)).rolling(14).mean() |
|
|
rs = gain / loss |
|
|
rsi = 100 - (100 / (1 + rs)) |
|
|
|
|
|
return pd.DataFrame({"RSI": rsi}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calc_supertrend(df, period=10, multiplier=3): |
|
|
hl2 = (df["High"] + df["Low"]) / 2 |
|
|
tr1 = df["High"] - df["Low"] |
|
|
tr2 = abs(df["High"] - df["Close"].shift()) |
|
|
tr3 = abs(df["Low"] - df["Close"].shift()) |
|
|
tr = tr1.combine(tr2, max).combine(tr3, max) |
|
|
|
|
|
atr = tr.rolling(period).mean() |
|
|
upperband = hl2 + multiplier * atr |
|
|
lowerband = hl2 - multiplier * atr |
|
|
|
|
|
st = pd.Series(index=df.index) |
|
|
direction = pd.Series(index=df.index) |
|
|
|
|
|
for i in range(len(df)): |
|
|
if i == 0: |
|
|
st.iloc[i] = upperband.iloc[i] |
|
|
direction.iloc[i] = 1 |
|
|
else: |
|
|
if df["Close"].iloc[i] > st.iloc[i - 1]: |
|
|
st.iloc[i] = lowerband.iloc[i] |
|
|
direction.iloc[i] = 1 |
|
|
else: |
|
|
st.iloc[i] = upperband.iloc[i] |
|
|
direction.iloc[i] = -1 |
|
|
|
|
|
return pd.DataFrame({ |
|
|
"Supertrend": st, |
|
|
"Direction": direction |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calc_stochastic(df): |
|
|
if TALIB_AVAILABLE: |
|
|
slowk, slowd = talib.STOCH(df["High"], df["Low"], df["Close"]) |
|
|
else: |
|
|
low14 = df["Low"].rolling(14).min() |
|
|
high14 = df["High"].rolling(14).max() |
|
|
slowk = (df["Close"] - low14) * 100 / (high14 - low14) |
|
|
slowd = slowk.rolling(3).mean() |
|
|
|
|
|
return pd.DataFrame({"STOCH_K": slowk, "STOCH_D": slowd}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calc_keltner(df): |
|
|
typical = (df["High"] + df["Low"] + df["Close"]) / 3 |
|
|
ema = typical.ewm(span=20).mean() |
|
|
atr = (df["High"] - df["Low"]).rolling(20).mean() |
|
|
|
|
|
upper = ema + 2 * atr |
|
|
lower = ema - 2 * atr |
|
|
|
|
|
return pd.DataFrame({"KC_UP": upper, "KC_MID": ema, "KC_LOW": lower}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calc_zigzag(df, pct=3): |
|
|
zigzag = [np.nan] * len(df) |
|
|
last_pivot = df["Close"].iloc[0] |
|
|
|
|
|
for i in range(1, len(df)): |
|
|
change = (df["Close"].iloc[i] - last_pivot) / last_pivot * 100 |
|
|
if abs(change) >= pct: |
|
|
zigzag[i] = df["Close"].iloc[i] |
|
|
last_pivot = df["Close"].iloc[i] |
|
|
|
|
|
return pd.DataFrame({"ZIGZAG": zigzag}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calc_swings(df, period=5): |
|
|
swing_high = df["High"].rolling(period).max() |
|
|
swing_low = df["Low"].rolling(period).min() |
|
|
|
|
|
return pd.DataFrame({ |
|
|
"SWING_HIGH": swing_high, |
|
|
"SWING_LOW": swing_low |
|
|
}) |
|
|
|