Introduction

This method will work with any websocket data containing a timestamp and price. I’m going to demonstrate this using EODHD’s “crypto” websocket end-point. It’s important to note that websocket end-points for different asset types may be presented differently. If that is the case you will need to make the adjustments to the code. For example in the cryptocurrencies end-point the price is represented as “p” and the timestamp is represented as “t” as a UNIX timestamp. If you are going to try this with the “us”, “us-quote”, or “forex” end-points, you need to make sure this looks the same for those end-points. If it isn’t, then just map the fields accordingly. It would also be worth checking the format of the timestamp field. For the “crypto” example below the timestamp is a UNIX timestamp. If you are working with source data where this isn’t the case, then just adjust the date format accordingly.

How to create trading and candlesticks from a websocket stream

Learn how to use Python to craft trading candlesticks from live WebSocket data with the EODHD API. This article breaks down the process in easy-to-follow steps, helping you dive into real-time trading analysis with confidence. Whether you’re a seasoned trader or just getting started, we’ve got you covered with practical insights to turn raw data into actionable strategies.

Receiving the EODHD API’s WebSocket in Python

I will break the code up into sections and build it up from there. It will be easier to understand than looking at the final code without seeing how we got there. Also, you can find how Real-time Websockets API works here.

Accessing the raw websocket stream in Python is fairly straight forward. You will use the “websocket-client” and “threading” libraries. EODHD API’s supports four endpoints, “us“, “us-quote“, “forex” and “crypto“.

This example will use the “websocket-client” library.

python3 -m pip install websocket-client

A very basic example of what this could look like is as follows:

import websocket
import threading
import time
import json

stop_event = threading.Event()

def collect_data():
    uri = "wss://ws.eodhistoricaldata.com/ws/crypto?api_token=OeAFFmMliFG5orCUuwAKQ8l4WWFQ67YX"

    ws = websocket.create_connection(uri)

    payload = {
        "action": "subscribe",
        "symbols": "BTC-USD",
    }
    ws.send(json.dumps(payload))

    while not stop_event.is_set():
        message = ws.recv()
        print(message)
        time.sleep(0.25)

    ws.close()


t = threading.Thread(target=collect_data)
t.start()

time.sleep(0.25)

stop_event.set()
t.join()

I’ve highlighted some points of interest in the code above.

  1. crypto” is the endpoint.
  2. OeAFFmMliFG5orCUuwAKQ8l4WWFQ67YX” is the demo API key for testing.
  3. BTC-USD” is the market to subscribe too, this can be a comma separated list.
  4. time.sleep(0.25)” delay between printing on the screen – be careful removing this or reducing it too low.
  5. time.sleep(0.25)” duration that the script should run before gracefully exiting.

Register & Get Data

Creating a WebSocket class

I have already added this to the Official “eodhd” Python library in version 1.0.23.

import websocket
import threading
import signal
import time
import json
import re
import pandas as pd

pd.set_option('display.float_format', '{:.8f}'.format)


class WebSocketClient:
    def __init__(
        self,
        api_key: str,
        endpoint: str,
        symbols: list,
        store_data: bool = False,
        display_stream: bool = False,
        display_candle_1m: bool = False,
        display_candle_5m: bool = False,
        display_candle_1h: bool = False,
    ) -> None:
        # Validate API key
        prog = re.compile(r"^[A-z0-9.]{16,32}$")
        if not prog.match(api_key):
            raise ValueError("API key is invalid")

        # Validate endpoint
        if endpoint not in ["us", "us_quote", "forex", "crypto"]:
            raise ValueError("Endpoint is invalid")

        # Validate symbol list
        if len(symbols) == 0:
            raise ValueError("No symbol(s) provided")

        # Validate individual symbols
        prog = re.compile(r"^[A-z0-9-$]{1,48}$")
        for symbol in symbols:
            if not prog.match(symbol):
                raise ValueError(f"Symbol is invalid: {symbol}")

        # Validate max symbol subscriptions
        if len(symbols) > 50:
            raise ValueError("Max symbol subscription count is 50!")

        # Map class arguments to private variables
        self._api_key = api_key
        self._endpoint = endpoint
        self._symbols = symbols
        self._store_data = store_data
        self._display_stream = display_stream
        self._display_candle_1m = display_candle_1m
        self._display_candle_5m = display_candle_5m
        self._display_candle_1h = display_candle_1h

        self.running = True
        self.message = None
        self.stop_event = threading.Event()
        self.data_list = []
        self.ws = None

        # Register signal handlers
        signal.signal(signal.SIGINT, self._signal_handler)
        signal.signal(signal.SIGTERM, self._signal_handler)

    def _signal_handler(self, signum, frame):
        print("Stopping websocket...")
        self.running = False
        self.stop_event.set()
        self.thread.join()
        print("Websocket stopped.")

    def _floor_to_nearest_interval(self, timestamp_ms, interval):
        # Convert to seconds
        timestamp_s = timestamp_ms // 1000

        # Define the number of seconds for each interval
        seconds_per_minute = 60
        seconds_per_hour = seconds_per_minute * 60

        # Determine the number of seconds for the given interval
        if interval == "1 minute":
            interval_seconds = seconds_per_minute
        elif interval == "5 minutes":
            interval_seconds = 5 * seconds_per_minute
        elif interval == "1 hour":
            interval_seconds = seconds_per_hour
        else:
            raise ValueError(f"Unsupported interval: {interval}")

        # Floor to the nearest interval
        floored_s = (timestamp_s // interval_seconds) * interval_seconds

        # Convert back to milliseconds
        floored_ms = floored_s * 1000

        return floored_ms

    def _collect_data(self):
        self.ws = websocket.create_connection(f"wss://ws.eodhistoricaldata.com/ws/{self._endpoint}?api_token={self._api_key}")

        # Send the subscription message
        payload = {
            "action": "subscribe",
            "symbols": ",".join(self._symbols),
        }
        self.ws.send(json.dumps(payload))

        candle_1m = {}
        candle_5m = {}
        candle_1h = {}

        # Collect data until the stop event is set
        while not self.stop_event.is_set():
            self.message = self.ws.recv()
            message_json = json.loads(self.message)

            if self._store_data:
                self.data_list.append(self.message)

            if self._display_stream:
                print(self.message)

            if self._display_candle_1m:
                if "t" in message_json:
                    candle_date = self._floor_to_nearest_interval(message_json["t"], "1 minute")

                    if "t" in candle_1m and (candle_date != candle_1m["t"]):
                        print(candle_1m)

                        # New candle
                        candle_1m = {}

                    candle_1m["t"] = candle_date

                if "s" in message_json:
                    candle_1m["m"] = message_json["s"]
                    candle_1m["g"] = 60

                if "p" in message_json and "o" not in candle_1m:
                    # Forming candle
                    candle_1m["o"] = message_json["p"]
                    candle_1m["h"] = message_json["p"]
                    candle_1m["l"] = message_json["p"]
                    candle_1m["c"] = message_json["p"]
                    if "q" in message_json:
                        candle_1m["v"] = float(message_json["q"])
                elif "p" in message_json and "o" in candle_1m:
                    # Update candle
                    candle_1m["c"] = message_json["p"]

                    if message_json["p"] > candle_1m["h"]:
                        candle_1m["h"] = message_json["p"]

                    if message_json["p"] < candle_1m["l"]:
                        candle_1m["l"] = message_json["p"]

                    # Sum volume
                    candle_1m["v"] += float(message_json["q"])

                # Uncomment this to see the candle forming
                # print(candle_1m)

            if self._display_candle_5m:
                if "t" in message_json:
                    candle_date = self._floor_to_nearest_interval(message_json["t"], "5 minutes")

                    if "t" in candle_5m and (candle_date != candle_5m["t"]):
                        print(candle_5m)

                        # New candle
                        candle_5m = {}

                    candle_5m["t"] = candle_date

                if "s" in message_json:
                    candle_5m["m"] = message_json["s"]
                    candle_5m["g"] = 60

                if "p" in message_json and "o" not in candle_5m:
                    # Forming candle
                    candle_5m["o"] = message_json["p"]
                    candle_5m["h"] = message_json["p"]
                    candle_5m["l"] = message_json["p"]
                    candle_5m["c"] = message_json["p"]
                    if "q" in message_json:
                        candle_5m["v"] = float(message_json["q"])
                elif "p" in message_json and "o" in candle_5m:
                    # Update candle
                    candle_5m["c"] = message_json["p"]

                    if message_json["p"] > candle_5m["h"]:
                        candle_5m["h"] = message_json["p"]

                    if message_json["p"] < candle_5m["l"]:
                        candle_5m["l"] = message_json["p"]

                    # Sum volume
                    candle_5m["v"] += float(message_json["q"])

                # Uncomment this to see the candle forming
                # print(candle_5m)

            if self._display_candle_1h:
                if "t" in message_json:
                    candle_date = self._floor_to_nearest_interval(message_json["t"], "1 hour")

                    if "t" in candle_1h and (candle_date != candle_1h["t"]):
                        print(candle_1h)

                        # New candle
                        candle_1h = {}

                    candle_1h["t"] = candle_date

                if "s" in message_json:
                    candle_1h["m"] = message_json["s"]
                    candle_1h["g"] = 60

                if "p" in message_json and "o" not in candle_1h:
                    # Forming candle
                    candle_1h["o"] = message_json["p"]
                    candle_1h["h"] = message_json["p"]
                    candle_1h["l"] = message_json["p"]
                    candle_1h["c"] = message_json["p"]
                    if "q" in message_json:
                        candle_1h["v"] = float(message_json["q"])
                elif "p" in message_json and "o" in candle_1h:
                    # Update candle
                    candle_1h["c"] = message_json["p"]

                    if message_json["p"] > candle_1h["h"]:
                        candle_1h["h"] = message_json["p"]

                    if message_json["p"] < candle_1h["l"]:
                        candle_1h["l"] = message_json["p"]

                    # Sum volume
                    candle_1h["v"] += float(message_json["q"])

                # Uncomment this to see the candle forming
                # print(candle_1h)

        # Close the WebSocket connection
        self.ws.close()

    def _keepalive(self, interval=30):
        if (self.ws is not None) and (hasattr(self.ws, "connected")):
            while self.ws.connected:
                self.ws.ping("keepalive")
                time.sleep(interval)

    def start(self):
        self.thread = threading.Thread(target=self._collect_data)
        self.keepalive = threading.Thread(target=self._keepalive)
        self.thread.start()
        self.keepalive.start()

    def stop(self):
        self.stop_event.set()
        self.thread.join()
        self.keepalive.join()

    def get_data(self):
        return self.data_list


if __name__ == "__main__":
    client = WebSocketClient(
        api_key="OeAFFmMliFG5orCUuwAKQ8l4WWFQ67YX",
        endpoint="crypto",
        symbols=["BTC-USD"],
        display_stream=False,
        display_candle_1m=True,
        display_candle_5m=False,
        display_candle_1h=False,
    )
    client.start()

    try:
        while client.running:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\nStopping due to user request.")
        client.stop()

You don’t need to add all this code yourself now. You just need to access the code via the “eodhd” library like this.

import time
from eodhd import WebSocketClient

client = WebSocketClient(
    api_key="OeAFFmMliFG5orCUuwAKQ8l4WWFQ67YX",
    endpoint="crypto",
    symbols=["BTC-USD"],
    display_stream=True,
    display_candle_1m=False,
    display_candle_5m=False,
    display_candle_1h=False,
)
client.start()

try:
    msg = None
    while client.running:
        if msg is not None and client.message != msg:
            print(client.message)
        msg = client.message
except KeyboardInterrupt:
    print("\nStopping due to user request.")
    client.stop()

Creating Trading Candles from Websocket data

A websocket trading data stream is great if you want to create a fancy dashboard and ticker, as the data being received is real-time. It’s not all that great if you want to store the data and analyse it.

In order to do this we’ll need to create trading candlesticks for a desired interval. The way it will work is as follows:

  • Collect the stream data for a current candle.
  • Store the first price in the “open”, “high”, “low”, “close” fields.
  • Store the volume in the “volume” field.
  • For subsequent prices, if the price is higher than the “high” then update it, if the price is lower than the “low” then update it, store the price in the “close”, and add the current volume to the “volume”.
  • When the candle completes finalise it and start over.

At the end of the candlestick period we should have a standard OHLCV candle that shows the opening price, high within the interval, low within the interval”, the last price of the interval in “close”, and a sum of all the volume in “volume”.

So that’s the theory, how does it look in practice?

A normal “crypto” websocket stream looks like this:

We are going to want to extract the market (“s”), price (“p”), volume (“q”), and the unix timestamp in milliseconds (“t”).

I’ll show you how to create a 1 minute candle, the process will be the same for the others. I have actually added this code into the Official EODHD Python library code as well for reference (“webstocketclient.py“).

We will need to calculate the floor of the closest minute to the unix timestamp. I’ve created a function that will do this.

def _floor_to_nearest_interval(self, timestamp_ms, interval):
    # Convert to seconds
    timestamp_s = timestamp_ms // 1000

    # Define the number of seconds for each interval
    seconds_per_minute = 60
    seconds_per_hour = seconds_per_minute * 60
    seconds_per_day = seconds_per_hour * 24

    # Determine the number of seconds for the given interval
    if interval == "1 minute":
        interval_seconds = seconds_per_minute
    elif interval == "5 minutes":
        interval_seconds = 5 * seconds_per_minute
    elif interval == "1 hour":
        interval_seconds = seconds_per_hour
    else:
        raise ValueError(f"Unsupported interval: {interval}")

    # Floor to the nearest interval
    floored_s = (timestamp_s // interval_seconds) * interval_seconds

    # Convert back to milliseconds
    floored_ms = floored_s * 1000

    return floored_ms

So this _floor_to_nearest_interval(1695155147166, “1 minute”) will return 1695155100000. Every time the next minute starts it will be rounded down to the floor of the current minute.

We will want to start with an empty candle called “candle_1m” that we will reset every minute.

candle_1m = {}

And the process to creating 1 minute candles will look like this:

# Collect data until the stop event is set
while not self.stop_event.is_set():
    self.message = self.ws.recv()
    message_json = json.loads(self.message)

    if "t" in message_json:
        candle_date = self._floor_to_nearest_interval(message_json["t"], "1 minute")

        if "t" in candle_1m and (candle_date != candle_1m["t"]):
            print(candle_1m)

            # New candle
            candle_1m = {}

        candle_1m["t"] = candle_date

    if "s" in message_json:
        candle_1m["m"] = message_json["s"]
        candle_1m["g"] = 60

    if "p" in message_json and "o" not in candle_1m:
        # Forming candle
        candle_1m["o"] = message_json["p"]
        candle_1m["h"] = message_json["p"]
        candle_1m["l"] = message_json["p"]
        candle_1m["c"] = message_json["p"]
        if "q" in message_json:
            candle_1m["v"] = float(message_json["q"])
    elif "p" in message_json and "o" in candle_1m:
        # Update candle
        candle_1m["c"] = message_json["p"]

        if message_json["p"] > candle_1m["h"]:
            candle_1m["h"] = message_json["p"]

        if message_json["p"] < candle_1m["l"]:
            candle_1m["l"] = message_json["p"]

        # Sum volume
        candle_1m["v"] += float(message_json["q"])

    # Uncomment this to see the candle forming
    #print(candle_1m)

Hopefully you can see what’s happening here…

  1. A new websocket entry is received as per the screenshot above.
  2. Convert the websocket string data to a JSON object.
  3. If “t” exists in the message, run our function to calculate the floor.
  4. All subsequent floored timestamps will be associated with the same candle, if it changes it assumes it is a new candle and resets the candle_1m object.
  5. Add the market to the candle as “m”.
  6. Add the granularity / interval to the candle as “g”.
  7. If the candle is newly created set the price “p” in “o” for open, “h” for high”, “l” for low, “c” for close, and “v” for volume.
  8. For subsequent messages that are part of the same candle, if the price is higher than the high then update it, if the price is lower than the low then update it, and add the volume to the volume.

The code will only print a completed candle so there will be < 60 second delay before you see anything. If you want to see the candle forming in real time, you can comment out that print at the bottom.

If you want to run this code using the “eodhd” library you can run it like this…

import time
import pandas as pd
from eodhd import WebSocketClient

pd.set_option('display.float_format', '{:.8f}'.format)

client = WebSocketClient(
    api_key="OeAFFmMliFG5orCUuwAKQ8l4WWFQ67YX",
    endpoint="crypto",
    symbols=["BTC-USD"],
    display_stream=False,
    display_candle_1m=True,
    display_candle_5m=False,
    display_candle_1h=False,
)
client.start()

try:
    while client.running:
        time.sleep(1)
except KeyboardInterrupt:
    print("\nStopping due to user request.")
    client.stop()

The API key above is the demo API key, you should replace it with your own.

I’ve set it up so you can toggle the data on and off depending on what you want to see.

Give it ago, if you have any issues feel free to add an Issue in the Github repo and I’ll take a look.

Register & Get Data

Do you enjoy our articles?

We can send new ones right to your email box

Join the discussion at forum.eodhd.com