File size: 3,721 Bytes
ec552e6 91fe931 ec552e6 91fe931 ec552e6 91fe931 ec552e6 91fe931 ec552e6 91fe931 ec552e6 91fe931 ec552e6 91fe931 ec552e6 91fe931 ec552e6 91fe931 ec552e6 91fe931 ec552e6 91fe931 ec552e6 91fe931 ec552e6 91fe931 ec552e6 91fe931 ec552e6 91fe931 ec552e6 91fe931 ec552e6 91fe931 ec552e6 91fe931 ec552e6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# indicator.py
import pandas as pd
import numpy as np
# Try TA-Lib
try:
import talib
TALIB_AVAILABLE = True
except:
TALIB_AVAILABLE = False
# ==============================
# MACD
# ==============================
def calc_macd(df):
if TALIB_AVAILABLE:
macd, signal, hist = talib.MACD(df["Close"])
else:
ema12 = df["Close"].ewm(span=12).mean()
ema26 = df["Close"].ewm(span=26).mean()
macd = ema12 - ema26
signal = macd.ewm(span=9).mean()
hist = macd - signal
return pd.DataFrame({
"MACD": macd,
"Signal": signal,
"Histogram": hist
})
# ==============================
# RSI
# ==============================
def calc_rsi(df):
if TALIB_AVAILABLE:
rsi = talib.RSI(df["Close"], timeperiod=14)
else:
delta = df["Close"].diff()
gain = (delta.where(delta > 0, 0)).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return pd.DataFrame({"RSI": rsi})
# ==============================
# Supertrend (Custom)
# ==============================
def calc_supertrend(df, period=10, multiplier=3):
hl2 = (df["High"] + df["Low"]) / 2
tr1 = df["High"] - df["Low"]
tr2 = abs(df["High"] - df["Close"].shift())
tr3 = abs(df["Low"] - df["Close"].shift())
tr = tr1.combine(tr2, max).combine(tr3, max)
atr = tr.rolling(period).mean()
upperband = hl2 + multiplier * atr
lowerband = hl2 - multiplier * atr
st = pd.Series(index=df.index)
direction = pd.Series(index=df.index)
for i in range(len(df)):
if i == 0:
st.iloc[i] = upperband.iloc[i]
direction.iloc[i] = 1
else:
if df["Close"].iloc[i] > st.iloc[i - 1]:
st.iloc[i] = lowerband.iloc[i]
direction.iloc[i] = 1
else:
st.iloc[i] = upperband.iloc[i]
direction.iloc[i] = -1
return pd.DataFrame({
"Supertrend": st,
"Direction": direction
})
# ==============================
# Stochastic
# ==============================
def calc_stochastic(df):
if TALIB_AVAILABLE:
slowk, slowd = talib.STOCH(df["High"], df["Low"], df["Close"])
else:
low14 = df["Low"].rolling(14).min()
high14 = df["High"].rolling(14).max()
slowk = (df["Close"] - low14) * 100 / (high14 - low14)
slowd = slowk.rolling(3).mean()
return pd.DataFrame({"STOCH_K": slowk, "STOCH_D": slowd})
# ==============================
# Keltner Channel
# ==============================
def calc_keltner(df):
typical = (df["High"] + df["Low"] + df["Close"]) / 3
ema = typical.ewm(span=20).mean()
atr = (df["High"] - df["Low"]).rolling(20).mean()
upper = ema + 2 * atr
lower = ema - 2 * atr
return pd.DataFrame({"KC_UP": upper, "KC_MID": ema, "KC_LOW": lower})
# ==============================
# ZigZag (simplified)
# ==============================
def calc_zigzag(df, pct=3):
zigzag = [np.nan] * len(df)
last_pivot = df["Close"].iloc[0]
for i in range(1, len(df)):
change = (df["Close"].iloc[i] - last_pivot) / last_pivot * 100
if abs(change) >= pct:
zigzag[i] = df["Close"].iloc[i]
last_pivot = df["Close"].iloc[i]
return pd.DataFrame({"ZIGZAG": zigzag})
# ==============================
# Swing High / Low
# ==============================
def calc_swings(df, period=5):
swing_high = df["High"].rolling(period).max()
swing_low = df["Low"].rolling(period).min()
return pd.DataFrame({
"SWING_HIGH": swing_high,
"SWING_LOW": swing_low
})
|