まずは、Visual Studio Codeを起動してプログラムファイルを作成する
Visual Studio Code (VS Code)を起動したら新規ファイル(*.py)を作成して行1-332をコピペします。
ここでは、Jupter NotebookのようにPythonのプログラムをセル単位で実行します。
VS Codeの場合は「#%%」から「#%%」の間がセルになります。
セルを選択したら[Ctrl + Enter」でセルのコードを実行します。
IPythonが起動されて「インタラクティブ」ウィンドウが表示されます。
「インタラクティブ」ウィンドウからはPythonのコードを入力して実行させることができます。
たとえば、「df.info()」を入力して[Shift + Enter」で実行します。
* Article.py:
# Daily returns vs Log returns article v71.py (Part 6) : Binance Crypto
# %%
### Import pandas, matplotlib, plotly libraries
import sys
import os
import math
import numpy as np
import datetime as dt
import time
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import plotly.offline as offline
import plotly.express as px
import plotly.graph_objs as go
import datetime as dt
from datetime import timedelta
from time import sleep
import requests
import warnings
warnings.simplefilter('ignore')
plt.style.use('fivethirtyeight')
pd.set_option('display.max_rows', 10)
# %%
###################################################### Get all crypto symbols (USDT)
def get_crypto_symbols(quote_currency='USDT') -> list:
# Define Binance API endpoint
endpoint = 'https://api.binance.com/api/v3/exchangeInfo'
# Make a GET request to the endpoint and parse the response as JSON
response = requests.get(endpoint).json()
# Extract all symbols that end with the specified quote currency
symbols = [info['symbol'] for info in response['symbols'] if info['symbol'].endswith(quote_currency)]
return symbols
####################################################################### Load crypto data from binance
def get_crypto(symbol='BTCUSDT', interval='1d', limit=500, start=None):
'''
interval='1d' 1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d, 3d, 1w, 1M
limit=500 (default 500, max 1000)
start='2019-01-01 00:00:00' default
'''
url = 'https://api.binance.com/api/v3/klines'
params = {
'symbol': symbol,
'interval': interval,
'limit': str(min(limit, 1000)) # enforce max limit of 1000
}
if start is not None:
# add a new element(startTime) to the params dictionary
params['startTime'] = int(dt.datetime.timestamp(pd.to_datetime(start))*1000) # convert to milliseconds (ms)
try:
# print(params)
# {'symbol': 'BTCUSDT', 'interval': '1d', 'limit': '1000'}
# {'symbol': 'BTCUSDT', 'interval': '1m', 'limit': '1000', 'startTime': 1672498800000}
kline = requests.get(url, params=params)
kline.raise_for_status() # raise an error for 4xx and 5xx status codes
kline_json = kline.json()
df = pd.DataFrame(kline_json, columns=['date', 'open', 'high', 'low', 'close', 'volume', 'close_time',
'quote_asset_volume', 'number_of_trades', 'taker_buy_base_asset_volume',
'taker_buy_quote_asset_volume', '_'])
df = df.iloc[:, :6] # only keep the columns for date, OHLC, and volume
df['date'] = pd.to_datetime(df['date'], unit='ms')
df = df.set_index('date')
df = df.astype(float)
df['symbol'] = symbol
print(f"get_crypto({symbol=}, {interval}) => {df.shape[0]=}")
return df
except requests.exceptions.HTTPError as e:
print(f"get_crypto({symbol=}, {interval}) HTTP error: {e}")
except Exception as e:
print(f"get_crypto({symbol=}, {interval}) exception error: {e}")
return pd.DataFrame()
############################################ load crypto data from csv file
def get_data(csv_file: str) -> pd.DataFrame:
print(f"Loading data: {csv_file} ")
df = pd.read_csv(csv_file)
# date,open,high,low,close,adj close,volume,symbol
intervals = {'1d', '3d', '1w', '1M'}
found = any(interval in csv_file for interval in intervals)
if found: # '1d', '3d', '1w', '1M'
df['date'] = pd.to_datetime(df['date']) # convert to a datetime
else: # 1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h
# df['date'] = pd.to_datetime(df['date'])
df['date'] = pd.to_datetime(df['date'], utc=True) # convert to a timezone-aware UTC-localized Datetime
df.set_index(['date'], inplace=True)
return df
############################################################### Calculate cumulative log return
def calculate_cum_log_return(df: pd.DataFrame) -> pd.DataFrame:
# Calculate log return
df['log_return'] = np.log(df['close'] / df['close'].shift(1))
# Calculate cumulative log return
df['cum_log_return'] = np.exp(df['log_return'].cumsum()) - 1
df['cum_log_return_pct'] = df['cum_log_return'] * 100
# Preview the resulting dataframe
print(f"Cumulative Log Return for {df.iloc[-1]['symbol']} = {df.iloc[-1]['cum_log_return_pct']:.2%}")
return df
#################################
# Main
#################################
### Get all symbols for quote currency (USDT)
symbols = get_crypto_symbols()
# symbols # 424
# print(symbols)
# %%
### Load the data from binance
interval = '1d' # "1m", "3m", "5m", "15m", "30m", "1h", "2h", "4h", "6h", "8h", "12h", "1d", "3d", "1w", "1M"
limit = 1000
start='now' # now or today
# start='2023-01-01 00:00:00'
df = pd.DataFrame()
symbol_list = []
cum_log_return_list = []
# symbols = ['BTCJPY'] # DEBUG
for symbol in symbols:
csv_file = f"data/csv/all_binance_cryptocurrencies({symbol})_{interval}.csv" # data/csv/all_binance_cryptocurrencies(BTCUSDT)_1d.csv
isFile = os.path.isfile(csv_file)
if not isFile:
# get_crypto(symbol='BTCUSDT', interval='1d', limit=500, start=any)
if start in 'today, now':
df = get_crypto(symbol, interval, limit)
else:
df = get_crypto(symbol, interval, limit, start)
if not df.empty:
df.to_csv(csv_file, index=True)
else:
symbols.remove(symbol)
# end of if not isFile:
isFile = os.path.isfile(csv_file)
if isFile:
df = get_data(csv_file)
print(f"{csv_file=}, {df.shape} ")
if not df.empty:
df = calculate_cum_log_return(df)
df.replace([np.inf, -np.inf], np.nan).dropna(axis=1, inplace=True)
cum_log_return = df.iloc[-1]['cum_log_return']
symbol_list.append(symbol)
cum_log_return_list.append(cum_log_return)
if df.empty:
print(f"Quit the program due to df is empty: {df.empty=}")
quit() # this is not working for IPython (Jupyter-Notebook)
# print(symbol_list)
# print(cum_log_return_list)
# %%
### Create DataFrame from dict
data = {
'symbol': symbol_list,
'cum_log_return': cum_log_return_list
}
raw_df = pd.DataFrame(data)
# raw_df
# %%
raw_df.isnull().sum()
# %%
### Replace np.inf or -np.inf (positive or negative infinity) with np.nan(Not A Number)
df = raw_df.replace([np.inf, -np.inf], np.nan)
### Drop rows if np.nan (Not A Number)
df.dropna(axis=0, inplace=True)
df.isnull().sum()
# %%
raw2_df = df.copy()
### Print Top or Bottom 10 Cryptocurrencies by Cumulative Log Return : Reset index and add 1 to each index
best_df = df.nlargest(10, 'cum_log_return')
worst_df = df.nsmallest(10, 'cum_log_return')
best_df.reset_index(drop=True, inplace=True)
worst_df.reset_index(drop=True, inplace=True)
# Add 1 to each index
best_df.index = best_df.index + 1
worst_df.index = worst_df.index + 1
print('Top 10 Cryptocurrencies by Cumulative Log Return')
print('-'*60)
print(best_df)
print()
print('Bottom 10 Cryptocurrencies by Cumulative Log Return')
print('-'*60)
print(worst_df)
# %%
best_df = df.nlargest(10, 'cum_log_return')
worst_df = df.nsmallest(10, 'cum_log_return')
best_df.reset_index(drop=True, inplace=True)
worst_df.reset_index(drop=True, inplace=True)
# Add 1 to each index
best_df.index = best_df.index + 1
worst_df.index = worst_df.index + 1
# Format cum_log_return_pct as a percentage with two decimal places
best_df['cum_log_return_pct'] = best_df['cum_log_return'].apply(lambda x: '{:.2%}'.format(x))
worst_df['cum_log_return_pct'] = worst_df['cum_log_return'].apply(lambda x: '{:.2%}'.format(x))
print('Top 10 Cryptocurrencies by Cumulative Log Return')
print('-'*60)
print(best_df)
print()
print('Bottom 10 Cryptocurrencies by Cumulative Log Return')
print('-'*60)
print(worst_df)
# %%
### Print Top or Bottom 10 Cryptocurrencies by Cumulative Log Return : Add gradation
best_df = df.nlargest(10, 'cum_log_return')
worst_df = df.nsmallest(10, 'cum_log_return')
best_df.reset_index(drop=True, inplace=True)
worst_df.reset_index(drop=True, inplace=True)
# Add 1 to each index
best_df.index = best_df.index + 1
worst_df.index = worst_df.index + 1
# Format cum_log_return_pct as a percentage with two decimal places
best_df['cum_log_return_pct'] = best_df['cum_log_return'] * 100
worst_df['cum_log_return_pct'] = worst_df['cum_log_return'] * 100
# best_df['cum_log_return_pct'] = best_df['cum_log_return'].apply(lambda x: x * 100)
# worst_df['cum_log_return_pct'] = worst_df['cum_log_return'].apply(lambda x: x * 100)
# Print Top or Bottom 10 Cryptocurrencies by Cumulative Log Return : Print with gradation
from IPython.display import HTML
# Apply the background gradient to the dataframe and render it as an HTML table
best_html_table = best_df.style.background_gradient(subset=['cum_log_return_pct'], axis=0).render()
# Display the HTML table as an output
HTML(best_html_table)
# %%
# Apply the background gradient to the dataframe and render it as an HTML table
worst_html_table = worst_df.style.background_gradient(subset=['cum_log_return_pct'], axis=0).render()
# Display the HTML table as an output
HTML(worst_html_table)
# %%
### Print Top or Bottom 10 Cryptocurrencies by Cumulative Log Return : df.iterrows()
best_df = df.nlargest(10, 'cum_log_return')
worst_df = df.nsmallest(10, 'cum_log_return')
best_df.reset_index(inplace=True)
worst_df.reset_index(inplace=True)
# Add 1 to each index
best_df.index = best_df.index + 1
worst_df.index = worst_df.index + 1
print('Top 10 Cryptocurrencies by Cumulative Log Return (%)')
print('-'*70)
for i, row in best_df.iterrows():
cum_log_return = f"{row['cum_log_return']:.2%}"
print(f"{i}: {row['symbol'].ljust(15)} \t cumulative log return: {cum_log_return.rjust(15)}")
print()
print('Bottom 10 Cryptocurrencies by Cumulative Log Return (%)')
print('-'*70)
for i, row in worst_df.iterrows():
cum_log_return = f"{row['cum_log_return']:.2%}"
print(f"{i}: {row['symbol'].ljust(15)} \t cumulative log return: {cum_log_return.rjust(15)}")
# %%
### Find the specific coin's rank
best_df = df.nlargest(df.shape[0], 'cum_log_return')
worst_df = df.nsmallest(df.shape[0], 'cum_log_return')
best_df.reset_index(inplace=True)
worst_df.reset_index(inplace=True)
coin = 'BTCUSDT' # BTCUSDT, ETHUSDT, LTCUSDT, MATICUSDT
find_coin = best_df['symbol'] == coin
found_df = best_df[find_coin]
if found_df.shape[0]:
print(f"The {coin} coin is ranked {found_df.index.values[0]+1}th.")
# %%
図1にはVS Codeの画面が表示されています。
次のステップでは「セル」を選択して「セル」単位でPythonのコードを実行します。