Quick jump:
Introduction
This method will work with any websocket data containing a timestamp and price. I’m going to demonstrate this using EODHD’s “crypto” websocket end-point. It’s important to note that websocket end-points for different asset types may be presented differently. If that is the case you will need to make the adjustments to the code. For example in the cryptocurrencies end-point the price is represented as “p” and the timestamp is represented as “t” as a UNIX timestamp. If you are going to try this with the “us”, “us-quote”, or “forex” end-points, you need to make sure this looks the same for those end-points. If it isn’t, then just map the fields accordingly. It would also be worth checking the format of the timestamp field. For the “crypto” example below the timestamp is a UNIX timestamp. If you are working with source data where this isn’t the case, then just adjust the date format accordingly.
How to create trading and candlesticks from a websocket stream
Learn how to use Python to craft trading candlesticks from live WebSocket data with the EODHD API. This article breaks down the process in easy-to-follow steps, helping you dive into real-time trading analysis with confidence. Whether you’re a seasoned trader or just getting started, we’ve got you covered with practical insights to turn raw data into actionable strategies.
Receiving the EODHD API’s WebSocket in Python
I will break the code up into sections and build it up from there. It will be easier to understand than looking at the final code without seeing how we got there. Also, you can find how Real-time Websockets API works here.
Accessing the raw websocket stream in Python is fairly straight forward. You will use the “websocket-client” and “threading” libraries. EODHD API’s supports four endpoints, “us“, “us-quote“, “forex” and “crypto“.
This example will use the “websocket-client” library.
python3 -m pip install websocket-client
A very basic example of what this could look like is as follows:
import websocket
import threading
import time
import json
stop_event = threading.Event()
def collect_data():
uri = "wss://ws.eodhistoricaldata.com/ws/crypto?api_token=OeAFFmMliFG5orCUuwAKQ8l4WWFQ67YX"
ws = websocket.create_connection(uri)
payload = {
"action": "subscribe",
"symbols": "BTC-USD",
}
ws.send(json.dumps(payload))
while not stop_event.is_set():
message = ws.recv()
print(message)
time.sleep(0.25)
ws.close()
t = threading.Thread(target=collect_data)
t.start()
time.sleep(0.25)
stop_event.set()
t.join()
I’ve highlighted some points of interest in the code above.
- “crypto” is the endpoint.
- “OeAFFmMliFG5orCUuwAKQ8l4WWFQ67YX” is the demo API key for testing.
- “BTC-USD” is the market to subscribe too, this can be a comma separated list.
- “time.sleep(0.25)” delay between printing on the screen – be careful removing this or reducing it too low.
- “time.sleep(0.25)” duration that the script should run before gracefully exiting.
Creating a WebSocket class
I have already added this to the Official “eodhd” Python library in version 1.0.23.
import websocket
import threading
import signal
import time
import json
import re
import pandas as pd
pd.set_option('display.float_format', '{:.8f}'.format)
class WebSocketClient:
def __init__(
self,
api_key: str,
endpoint: str,
symbols: list,
store_data: bool = False,
display_stream: bool = False,
display_candle_1m: bool = False,
display_candle_5m: bool = False,
display_candle_1h: bool = False,
) -> None:
# Validate API key
prog = re.compile(r"^[A-z0-9.]{16,32}$")
if not prog.match(api_key):
raise ValueError("API key is invalid")
# Validate endpoint
if endpoint not in ["us", "us_quote", "forex", "crypto"]:
raise ValueError("Endpoint is invalid")
# Validate symbol list
if len(symbols) == 0:
raise ValueError("No symbol(s) provided")
# Validate individual symbols
prog = re.compile(r"^[A-z0-9-$]{1,48}$")
for symbol in symbols:
if not prog.match(symbol):
raise ValueError(f"Symbol is invalid: {symbol}")
# Validate max symbol subscriptions
if len(symbols) > 50:
raise ValueError("Max symbol subscription count is 50!")
# Map class arguments to private variables
self._api_key = api_key
self._endpoint = endpoint
self._symbols = symbols
self._store_data = store_data
self._display_stream = display_stream
self._display_candle_1m = display_candle_1m
self._display_candle_5m = display_candle_5m
self._display_candle_1h = display_candle_1h
self.running = True
self.message = None
self.stop_event = threading.Event()
self.data_list = []
self.ws = None
# Register signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
print("Stopping websocket...")
self.running = False
self.stop_event.set()
self.thread.join()
print("Websocket stopped.")
def _floor_to_nearest_interval(self, timestamp_ms, interval):
# Convert to seconds
timestamp_s = timestamp_ms // 1000
# Define the number of seconds for each interval
seconds_per_minute = 60
seconds_per_hour = seconds_per_minute * 60
# Determine the number of seconds for the given interval
if interval == "1 minute":
interval_seconds = seconds_per_minute
elif interval == "5 minutes":
interval_seconds = 5 * seconds_per_minute
elif interval == "1 hour":
interval_seconds = seconds_per_hour
else:
raise ValueError(f"Unsupported interval: {interval}")
# Floor to the nearest interval
floored_s = (timestamp_s // interval_seconds) * interval_seconds
# Convert back to milliseconds
floored_ms = floored_s * 1000
return floored_ms
def _collect_data(self):
self.ws = websocket.create_connection(f"wss://ws.eodhistoricaldata.com/ws/{self._endpoint}?api_token={self._api_key}")
# Send the subscription message
payload = {
"action": "subscribe",
"symbols": ",".join(self._symbols),
}
self.ws.send(json.dumps(payload))
candle_1m = {}
candle_5m = {}
candle_1h = {}
# Collect data until the stop event is set
while not self.stop_event.is_set():
self.message = self.ws.recv()
message_json = json.loads(self.message)
if self._store_data:
self.data_list.append(self.message)
if self._display_stream:
print(self.message)
if self._display_candle_1m:
if "t" in message_json:
candle_date = self._floor_to_nearest_interval(message_json["t"], "1 minute")
if "t" in candle_1m and (candle_date != candle_1m["t"]):
print(candle_1m)
# New candle
candle_1m = {}
candle_1m["t"] = candle_date
if "s" in message_json:
candle_1m["m"] = message_json["s"]
candle_1m["g"] = 60
if "p" in message_json and "o" not in candle_1m:
# Forming candle
candle_1m["o"] = message_json["p"]
candle_1m["h"] = message_json["p"]
candle_1m["l"] = message_json["p"]
candle_1m["c"] = message_json["p"]
if "q" in message_json:
candle_1m["v"] = float(message_json["q"])
elif "p" in message_json and "o" in candle_1m:
# Update candle
candle_1m["c"] = message_json["p"]
if message_json["p"] > candle_1m["h"]:
candle_1m["h"] = message_json["p"]
if message_json["p"] < candle_1m["l"]:
candle_1m["l"] = message_json["p"]
# Sum volume
candle_1m["v"] += float(message_json["q"])
# Uncomment this to see the candle forming
# print(candle_1m)
if self._display_candle_5m:
if "t" in message_json:
candle_date = self._floor_to_nearest_interval(message_json["t"], "5 minutes")
if "t" in candle_5m and (candle_date != candle_5m["t"]):
print(candle_5m)
# New candle
candle_5m = {}
candle_5m["t"] = candle_date
if "s" in message_json:
candle_5m["m"] = message_json["s"]
candle_5m["g"] = 60
if "p" in message_json and "o" not in candle_5m:
# Forming candle
candle_5m["o"] = message_json["p"]
candle_5m["h"] = message_json["p"]
candle_5m["l"] = message_json["p"]
candle_5m["c"] = message_json["p"]
if "q" in message_json:
candle_5m["v"] = float(message_json["q"])
elif "p" in message_json and "o" in candle_5m:
# Update candle
candle_5m["c"] = message_json["p"]
if message_json["p"] > candle_5m["h"]:
candle_5m["h"] = message_json["p"]
if message_json["p"] < candle_5m["l"]:
candle_5m["l"] = message_json["p"]
# Sum volume
candle_5m["v"] += float(message_json["q"])
# Uncomment this to see the candle forming
# print(candle_5m)
if self._display_candle_1h:
if "t" in message_json:
candle_date = self._floor_to_nearest_interval(message_json["t"], "1 hour")
if "t" in candle_1h and (candle_date != candle_1h["t"]):
print(candle_1h)
# New candle
candle_1h = {}
candle_1h["t"] = candle_date
if "s" in message_json:
candle_1h["m"] = message_json["s"]
candle_1h["g"] = 60
if "p" in message_json and "o" not in candle_1h:
# Forming candle
candle_1h["o"] = message_json["p"]
candle_1h["h"] = message_json["p"]
candle_1h["l"] = message_json["p"]
candle_1h["c"] = message_json["p"]
if "q" in message_json:
candle_1h["v"] = float(message_json["q"])
elif "p" in message_json and "o" in candle_1h:
# Update candle
candle_1h["c"] = message_json["p"]
if message_json["p"] > candle_1h["h"]:
candle_1h["h"] = message_json["p"]
if message_json["p"] < candle_1h["l"]:
candle_1h["l"] = message_json["p"]
# Sum volume
candle_1h["v"] += float(message_json["q"])
# Uncomment this to see the candle forming
# print(candle_1h)
# Close the WebSocket connection
self.ws.close()
def _keepalive(self, interval=30):
if (self.ws is not None) and (hasattr(self.ws, "connected")):
while self.ws.connected:
self.ws.ping("keepalive")
time.sleep(interval)
def start(self):
self.thread = threading.Thread(target=self._collect_data)
self.keepalive = threading.Thread(target=self._keepalive)
self.thread.start()
self.keepalive.start()
def stop(self):
self.stop_event.set()
self.thread.join()
self.keepalive.join()
def get_data(self):
return self.data_list
if __name__ == "__main__":
client = WebSocketClient(
api_key="OeAFFmMliFG5orCUuwAKQ8l4WWFQ67YX",
endpoint="crypto",
symbols=["BTC-USD"],
display_stream=False,
display_candle_1m=True,
display_candle_5m=False,
display_candle_1h=False,
)
client.start()
try:
while client.running:
time.sleep(1)
except KeyboardInterrupt:
print("\nStopping due to user request.")
client.stop()
You don’t need to add all this code yourself now. You just need to access the code via the “eodhd” library like this.
import time
from eodhd import WebSocketClient
client = WebSocketClient(
api_key="OeAFFmMliFG5orCUuwAKQ8l4WWFQ67YX",
endpoint="crypto",
symbols=["BTC-USD"],
display_stream=True,
display_candle_1m=False,
display_candle_5m=False,
display_candle_1h=False,
)
client.start()
try:
msg = None
while client.running:
if msg is not None and client.message != msg:
print(client.message)
msg = client.message
except KeyboardInterrupt:
print("\nStopping due to user request.")
client.stop()
Creating Trading Candles from Websocket data
A websocket trading data stream is great if you want to create a fancy dashboard and ticker, as the data being received is real-time. It’s not all that great if you want to store the data and analyse it.
In order to do this we’ll need to create trading candlesticks for a desired interval. The way it will work is as follows:
- Collect the stream data for a current candle.
- Store the first price in the “open”, “high”, “low”, “close” fields.
- Store the volume in the “volume” field.
- For subsequent prices, if the price is higher than the “high” then update it, if the price is lower than the “low” then update it, store the price in the “close”, and add the current volume to the “volume”.
- When the candle completes finalise it and start over.
At the end of the candlestick period we should have a standard OHLCV candle that shows the opening price, high within the interval, low within the interval”, the last price of the interval in “close”, and a sum of all the volume in “volume”.
So that’s the theory, how does it look in practice?
A normal “crypto” websocket stream looks like this:
We are going to want to extract the market (“s”), price (“p”), volume (“q”), and the unix timestamp in milliseconds (“t”).
I’ll show you how to create a 1 minute candle, the process will be the same for the others. I have actually added this code into the Official EODHD Python library code as well for reference (“webstocketclient.py“).
We will need to calculate the floor of the closest minute to the unix timestamp. I’ve created a function that will do this.
def _floor_to_nearest_interval(self, timestamp_ms, interval):
# Convert to seconds
timestamp_s = timestamp_ms // 1000
# Define the number of seconds for each interval
seconds_per_minute = 60
seconds_per_hour = seconds_per_minute * 60
seconds_per_day = seconds_per_hour * 24
# Determine the number of seconds for the given interval
if interval == "1 minute":
interval_seconds = seconds_per_minute
elif interval == "5 minutes":
interval_seconds = 5 * seconds_per_minute
elif interval == "1 hour":
interval_seconds = seconds_per_hour
else:
raise ValueError(f"Unsupported interval: {interval}")
# Floor to the nearest interval
floored_s = (timestamp_s // interval_seconds) * interval_seconds
# Convert back to milliseconds
floored_ms = floored_s * 1000
return floored_ms
So this _floor_to_nearest_interval(1695155147166, “1 minute”) will return 1695155100000. Every time the next minute starts it will be rounded down to the floor of the current minute.
We will want to start with an empty candle called “candle_1m” that we will reset every minute.
candle_1m = {}
And the process to creating 1 minute candles will look like this:
# Collect data until the stop event is set
while not self.stop_event.is_set():
self.message = self.ws.recv()
message_json = json.loads(self.message)
if "t" in message_json:
candle_date = self._floor_to_nearest_interval(message_json["t"], "1 minute")
if "t" in candle_1m and (candle_date != candle_1m["t"]):
print(candle_1m)
# New candle
candle_1m = {}
candle_1m["t"] = candle_date
if "s" in message_json:
candle_1m["m"] = message_json["s"]
candle_1m["g"] = 60
if "p" in message_json and "o" not in candle_1m:
# Forming candle
candle_1m["o"] = message_json["p"]
candle_1m["h"] = message_json["p"]
candle_1m["l"] = message_json["p"]
candle_1m["c"] = message_json["p"]
if "q" in message_json:
candle_1m["v"] = float(message_json["q"])
elif "p" in message_json and "o" in candle_1m:
# Update candle
candle_1m["c"] = message_json["p"]
if message_json["p"] > candle_1m["h"]:
candle_1m["h"] = message_json["p"]
if message_json["p"] < candle_1m["l"]:
candle_1m["l"] = message_json["p"]
# Sum volume
candle_1m["v"] += float(message_json["q"])
# Uncomment this to see the candle forming
#print(candle_1m)
Hopefully you can see what’s happening here…
- A new websocket entry is received as per the screenshot above.
- Convert the websocket string data to a JSON object.
- If “t” exists in the message, run our function to calculate the floor.
- All subsequent floored timestamps will be associated with the same candle, if it changes it assumes it is a new candle and resets the candle_1m object.
- Add the market to the candle as “m”.
- Add the granularity / interval to the candle as “g”.
- If the candle is newly created set the price “p” in “o” for open, “h” for high”, “l” for low, “c” for close, and “v” for volume.
- For subsequent messages that are part of the same candle, if the price is higher than the high then update it, if the price is lower than the low then update it, and add the volume to the volume.
The code will only print a completed candle so there will be < 60 second delay before you see anything. If you want to see the candle forming in real time, you can comment out that print at the bottom.
If you want to run this code using the “eodhd” library you can run it like this…
import time
import pandas as pd
from eodhd import WebSocketClient
pd.set_option('display.float_format', '{:.8f}'.format)
client = WebSocketClient(
api_key="OeAFFmMliFG5orCUuwAKQ8l4WWFQ67YX",
endpoint="crypto",
symbols=["BTC-USD"],
display_stream=False,
display_candle_1m=True,
display_candle_5m=False,
display_candle_1h=False,
)
client.start()
try:
while client.running:
time.sleep(1)
except KeyboardInterrupt:
print("\nStopping due to user request.")
client.stop()
The API key above is the demo API key, you should replace it with your own.
I’ve set it up so you can toggle the data on and off depending on what you want to see.
Give it ago, if you have any issues feel free to add an Issue in the Github repo and I’ll take a look.
Join the discussion at forum.eodhd.com