This tutorial uses a dataset that contains second-by-second stock-trade data for
the top 100 most-traded symbols, in a hypertable named stocks_real_time
. It
also includes a separate table of company symbols and company names, in a
regular PostgreSQL table named company
.
A service in Timescale is a cloud instance which contains your database.
Each service contains a single database, named tsdb
.
You can connect to a service from your local system using the psql
command-line utility. If you've used PostgreSQL before, you might already have
psql
installed. If not, check out the installing psql section.
In the Timescale portal, click
Create service
.Click
Download the cheatsheet
to download an SQL file that contains the login details for your new service. You can also copy the details directly from this page. When you have copied your password, clickI stored my password, go to service overview
at the bottom of the page.When your service is ready to use, is shows a green
Running
label in theService Overview
. You also receive an email confirming that your service is ready to use.On your local system, at the command prompt, connect to the service using the
Service URL
from the SQL file that you downloaded. When you are prompted, enter the password:psql -x "<SERVICE_URL>"Password for user tsdbadmin:If your connection is successful, you'll see a message like this, followed by the
psql
prompt:psql (13.3, server 12.8 (Ubuntu 12.8-1.pgdg21.04+1))SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)Type "help" for help.tsdb=>
When you connect to the Twelve Data API through a websocket, you create a persistent connection between your computer and the websocket server. You set up a Python environment, and pass two arguments to create a websocket object and establish the connection.
Create a new Python virtual environment for this project and activate it. All the packages you need to complete for this tutorial are installed in this environment.
Create and activate a Python virtual environment:
virtualenv envsource env/bin/activateInstall the Twelve Data Python wrapper library with websocket support. This library allows you to make requests to the API and maintain a stable websocket connection.
pip install twelvedata websocket-clientInstall Psycopg2 so that you can connect the TimescaleDB from your Python script:
pip install psycopg2-binary
A persistent connection between your computer and the websocket server is used to receive data for as long as the connection is maintained. You need to pass two arguments to create a websocket object and establish connection.
on_event
This argument needs to be a function that is invoked whenever there's a new data record is received from the websocket:
def on_event(event):print(event) # prints out the data record (dictionary)This is where you want to implement the ingestion logic so whenever there's new data available you insert it into the database.
symbols
This argument needs to be a list of stock ticker symbols (for example,
MSFT
) or crypto trading pairs (for example,BTC/USD
). When using a websocket connection you always need to subscribe to the events you want to receive. You can do this by using thesymbols
argument or if your connection is already created you can also use thesubscribe()
function to get data for additional symbols.
Create a new Python file called
websocket_test.py
and connect to the Twelve Data servers using the<YOUR_API_KEY>
:import timefrom twelvedata import TDClientmessages_history = []def on_event(event):print(event) # prints out the data record (dictionary)messages_history.append(event)td = TDClient(apikey="<YOUR_API_KEY>")ws = td.websocket(symbols=["BTC/USD", "ETH/USD"], on_event=on_event)ws.subscribe(['ETH/BTC', 'AAPL'])ws.connect()while True:print('messages received: ', len(messages_history))ws.heartbeat()time.sleep(10)Run the Python script:
python websocket_test.pyWhen you run the script, you receive a response from the server about the status of your connection:
{'event': 'subscribe-status','status': 'ok','success': [{'symbol': 'BTC/USD', 'exchange': 'Coinbase Pro', 'mic_code': 'Coinbase Pro', 'country': '', 'type': 'Digital Currency'},{'symbol': 'ETH/USD', 'exchange': 'Huobi', 'mic_code': 'Huobi', 'country': '', 'type': 'Digital Currency'}],'fails': None}When you have established a connection to the websocket server, wait a few seconds, and you can see data records, like this:
{'event': 'price', 'symbol': 'BTC/USD', 'currency_base': 'Bitcoin', 'currency_quote': 'US Dollar', 'exchange': 'Coinbase Pro', 'type': 'Digital Currency', 'timestamp': 1652438893, 'price': 30361.2, 'bid': 30361.2, 'ask': 30361.2, 'day_volume': 49153}{'event': 'price', 'symbol': 'BTC/USD', 'currency_base': 'Bitcoin', 'currency_quote': 'US Dollar', 'exchange': 'Coinbase Pro', 'type': 'Digital Currency', 'timestamp': 1652438896, 'price': 30380.6, 'bid': 30380.6, 'ask': 30380.6, 'day_volume': 49157}{'event': 'heartbeat', 'status': 'ok'}{'event': 'price', 'symbol': 'ETH/USD', 'currency_base': 'Ethereum', 'currency_quote': 'US Dollar', 'exchange': 'Huobi', 'type': 'Digital Currency', 'timestamp': 1652438899, 'price': 2089.07, 'bid': 2089.02, 'ask': 2089.03, 'day_volume': 193818}{'event': 'price', 'symbol': 'BTC/USD', 'currency_base': 'Bitcoin', 'currency_quote': 'US Dollar', 'exchange': 'Coinbase Pro', 'type': 'Digital Currency', 'timestamp': 1652438900, 'price': 30346.0, 'bid': 30346.0, 'ask': 30346.0, 'day_volume': 49167}Each price event gives you multiple data points about the given trading pair such as the name of the exchange, and the current price. You can also occasionally see
heartbeat
events in the response; these events signal the health of the connection over time. At this point the websocket connection is working successfully to pass data.
To ingest the data into your Timescale service, you need to implement the
on_event
function.
After the websocket connection is set up, you can use the on_event
function
to ingest data into the database. This is a data pipeline that ingests real-time
financial data into your Timescale service.
Stock trades are ingested in real-time Monday through Friday, typically during normal trading hours of the New York Stock Exchange (9:30 AM to 4:00 PM EST).
Hypertables are the core of Timescale. Hypertables enable Timescale to work efficiently with time-series data. Because Timescale is PostgreSQL, all the standard PostgreSQL tables, indexes, stored procedures and other objects can be created alongside your Timescale hypertables. This makes creating and working with Timescale tables similar to standard PostgreSQL.
Create a standard PostgreSQL table to store the real-time stock trade data using
CREATE TABLE
:CREATE TABLE stocks_real_time (time TIMESTAMPTZ NOT NULL,symbol TEXT NOT NULL,price DOUBLE PRECISION NULL,day_volume INT NULL);Convert the standard table into a hypertable partitioned on the
time
column using thecreate_hypertable()
function provided by Timescale. You must provide the name of the table and the column in that table that holds the timestamp data to use for partitioning:SELECT create_hypertable('stocks_real_time', by_range('time'));Create an index to support efficient queries on the
symbol
andtime
columns:CREATE INDEX ix_symbol_time ON stocks_real_time (symbol, time DESC);
Note
When you create a hypertable, it is automatically partitioned on the time column you provide as the second parameter to create_hypertable()
. Also, Timescale automatically creates an index on the time column. However, you'll often filter your time-series data on other columns as well. Using indexes appropriately helps your queries perform better.
Because you often query the stock trade data by the company symbol, you should add an index for it. Include the time column because time-series data typically looks for data in a specific period of time.
When you have other relational data that enhances your time-series data, you can
create standard PostgreSQL tables just as you would normally. For this dataset,
there is one other table of data called company
.
Add a table to store the company name and symbol for the stock trade data:
CREATE TABLE company (symbol TEXT NOT NULL,name TEXT NOT NULL);You now have two tables within your Timescale database. One hypertable named
stocks_real_time
, and one normal PostgreSQL table namedcompany
.
When you ingest data into a transactional database like Timescale, it is more efficient to insert data in batches rather than inserting data row-by-row. Using one transaction to insert multiple rows can significantly increase the overall ingest capacity and speed of your Timescale database.
A common practice to implement batching is to store new records in memory first, then after the batch reaches a certain size, insert all the records from memory into the database in one transaction. The perfect batch size isn't universal, but you can experiment with different batch sizes (for example, 100, 1000, 10000, and so on) and see which one fits your use case better. Using batching is a fairly common pattern when ingesting data into TimescaleDB from Kafka, Kinesis, or websocket connections.
You can implement a batching solution in Python with Psycopg2.
You can implement the ingestion logic within the on_event
function that
you can then pass over to the websocket object.
This function needs to:
- Check if the item is a data item, and not websocket metadata.
- Adjust the data so that it fits the database schema, including the data types, and order of columns.
- Add it to the in-memory batch, which is a list in Python.
- If the batch reaches a certain size, insert the data, and reset or empty the list.
Update the Python script that prints out the current batch size, so you can follow when data gets ingested from memory into your database. Use the
<HOST>
,<PASSWORD>
, and<PORT>
details for the Timescale service where you want to ingest the data and your API key from Twelve Data:import timeimport psycopg2from twelvedata import TDClientfrom psycopg2.extras import execute_valuesfrom datetime import datetimeclass WebsocketPipeline():# name of the hypertableDB_TABLE = "stocks_real_time"# columns in the hypertable in the correct orderDB_COLUMNS=["time", "symbol", "price", "day_volume"]# batch size used to insert data in batchesMAX_BATCH_SIZE=100def __init__(self, conn):"""Connect to the Twelve Data web socket server and streamdata into the database.Args:conn: psycopg2 connection object"""self.conn = connself.current_batch = []self.insert_counter = 0def _insert_values(self, data):if self.conn is not None:cursor = self.conn.cursor()sql = f"""INSERT INTO {self.DB_TABLE} ({','.join(self.DB_COLUMNS)})VALUES %s;"""execute_values(cursor, sql, data)self.conn.commit()def _on_event(self, event):"""This function gets called whenever there's a new data record comingback from the server.Args:event (dict): data record"""if event["event"] == "price":# data recordtimestamp = datetime.utcfromtimestamp(event["timestamp"])data = (timestamp, event["symbol"], event["price"], event.get("day_volume"))# add new data record to batchself.current_batch.append(data)print(f"Current batch size: {len(self.current_batch)}")# ingest data if max batch size is reached then reset the batchif len(self.current_batch) == self.MAX_BATCH_SIZE:self._insert_values(self.current_batch)self.insert_counter += 1print(f"Batch insert #{self.insert_counter}")self.current_batch = []def start(self, symbols):"""Connect to the web socket server and start streaming real-time datainto the database.Args:symbols (list of symbols): List of stock/crypto symbols"""td = TDClient(apikey="<YOUR_API_KEY")ws = td.websocket(on_event=self._on_event)ws.subscribe(symbols)ws.connect()while True:ws.heartbeat()time.sleep(10)onn = psycopg2.connect(database="tsdb",host="<HOST>",user="tsdbadmin",password="<PASSWORD>",port="<PORT>")symbols = ["BTC/USD", "ETH/USD", "MSFT", "AAPL"]websocket = WebsocketPipeline(conn)websocket.start(symbols=symbols)```Run the script:
python websocket_test.py
You can even create separate Python scripts to start multiple websocket connections for different types of symbols, for example, one for stock, and another one for cryptocurrency prices.
If you see an error message similar to this:
2022-05-13 18:51:41,976 - ws-twelvedata - ERROR - TDWebSocket ERROR: Handshake status 200 OK
Then check that you use a proper API key received from Twelve Data.
The queries in this tutorial are suitable for visualizing in Grafana. If you want to visualize the results of your queries, connect your Grafana account to the energy consumption dataset.
Grafana is and open source analytics and monitoring solution. You use Grafana to visualize queries directly from your Timescale Cloud service.
Before you import your data:
Create a target Timescale Cloud service.
Each Timescale Cloud service has a single database that supports the most popular extensions. Timescale Cloud services do not support tablespaces, and there is no superuser associated with a Timescale service.
- Install self-managed Grafana, or sign up for Grafana Cloud
To connect the data in your Timescale Cloud service to Grafana:
Log in to Grafana
In your browser, log in to either :
- Self-hosted Grafana: at
http://localhost:3000/
. The default credentials areadmin
,admin
. - Grafana Cloud: use the URL and credentials you set when you created your account.
- Self-hosted Grafana: at
Add your Timescale Cloud service as a data source
In the Grafana dashboard, navigate to
Configuration
>Data sources
, then clickAdd data source
.In
Add data source
, selectPostgreSQL
.Configure the data source using the connection in
$TARGET
:Name
: the name to use for the datasetHost
: the host and port for your service, in this format:<HOST>:<PORT>
.For example:
example.tsdb.cloud.timescale.com:35177
.Database
:tsdb
User
:tsdbadmin
, or another privileged userPassword
: the password forUser
TLS/SSL Mode
: selectrequire
PostgreSQL details
: enableTimescaleDB
Leave the default setting for all other fields
Click
Save & test
.
Grafana checks that your details are set correctly.
Keywords
Found an issue on this page?Report an issue or Edit this page in GitHub.