Background Information:
I'm in the process of building a workflow for real-time financial data analysis using the Wolfram Language on a budget. Due to the lack of native websocket support in Mathematica, I've decided to create a hybrid system that leverages Python for real-time data acquisition and Mathematica for analysis and processing.
After exploring various options for market data (alpaca, polygon.io, Alpha Vantage, Schwab, etc.), I settled on using the Schwab Developer API with data updates down to the second. This level of detail is crucial for my analysis needs, and it's been the most suitable option I've found without requiring expensive subscriptions.
The Challenge:
My main problem arises when integrating the real-time data feed with Mathematica. I am using SessionSubmit
to run asynchronous tasks in Mathematica that receive data from Python via ZeroMQ. The data is stored in a list (dataList) as it arrives. However, when I attempt to access or display dataList in Mathematica (by typing dataList and pressing Shift+Enter), the notebook blocks and waits for the next data update from the websocket.
I want to be able to access and work with the stored data immediately, without any blocking behavior. Ideally, when I request the data, I should get the current state of dataList instantly, without waiting for the next data update.
What I’ve Tried:
I initially used CreateScheduledTask
, but then switched to SessionSubmit
for more flexibility. Despite this, the blocking issue persists. I've tried using Dynamic
to display dataList, but the front end still waits for the next scheduled task to execute before showing the data. I suspect the issue may be related to how Mathematica handles the task and data synchronization, but I haven't found a way to resolve it.
My Goal:
I currently have data streaming into the WL notebook in real-time. I need a method to immediately access and display the current contents of dataList, without waiting for the next websocket activity or encountering any blocking behavior. The websocket server is written in Python with ~20 lines of code. I could post it if someone needs. Here is my Mathematica code:
subscribe = SocketConnect[{"127.0.0.1", 63217}, {"ZMQ", "SUB"}];
SetOptions[subscribe, "ZMQ_SUBSCRIBE" -> ""];
dataList = {};
receiveMessage[socket_] := Module[{msg, data},msg = SocketReadMessage[socket];
If[msg === $Failed,Nothing,data =ImportString[ByteArrayToString[msg], "JSON"];
AppendTo[dataList, data];data]];
task = SessionSubmit[ScheduledTask[Module[{data},
data = receiveMessage[subscribe];
Print["Received Data: ", data];],1]];
(* asyncTask =
CreateScheduledTask[Module[{data},
data = receiveMessage[subscribe]; Print[data]],1];
StartScheduledTask[asyncTask];*)
The socket is bound to the local host on port 63217 in Python. Mathematica connects to the same port and receives data continuously. The recieveMessage
function converts the incoming messages to strings and appends them to dataList. When I ask for dataList, that is where a significant delay takes place. SessionSubmit
completely blocks and CreateScheduledTask
waits for the update.
Python to Wolfram Data Bridge
import Sconfig,schwabdev,zmq,json
from datetime import datetime
stock_symbol = "SPY"
current_date = datetime.now().strftime("%y-%m-%d")
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:63217")
client = schwabdev.Client(Sconfig.appKey, Sconfig.appSecret)
def my_handler(message, another_var):
print(message)
socket.send_string(json.dumps(message))
streamer = client.stream
streamer.start(my_handler, "test")
streamer.send(streamer.level_one_equities(stock_symbol,
"0,1,2,3,4,5,9"))
This program does the following
- Connects to Schwab's Market Data API using the Python library(scwabdev) built by Tylere Bowers and my credentials.
- Creates a local ZMQ socket connection bound to port 63217.
- Sends a JSON string through the ZMQ socket every time a message is received from the market data API.
The data I'm sending in this example is for the S&P500 index "SPY" and contains the symbol, bid price, ask price, last price, bid size, ask size, and last size.
Help:
This workflow will be incredibly useful for many people if Mathematica can act on that incoming data in real-time. I am looking to fully decouple my working with dataList from the data accumulation effort. If I discover a solution, I'll post it here.
Update: Consider the following non-exhaustive table with GPT4o's opinion of market data providers:
Through the Wolfram App, one can access a robust collection of fundamental and technical data with daily granularity. Doing analysis and decision making on a daily(or longer) time scale can be done seamlessly with either the entity framework or FinancialData
and associated functions. However, analysis with data on time scales < 1 day require the Wolfram Finance Platform and a subscription to one of the first two options in the table. This is not a solution that will work for the average retail investor/trader. $25k/yr wouldn't be so bad if you were making millions but alas, most of us are not quite there yet. Here is one example of an alternative solution I've found to be useful. Solutions require market data from some external source and if you are not doing high frequency(sub-second) micro-transactions, many free options will get the job done. Also, streaming data will require websockets. My choice has been to use Charles Schwab's market data API via Python.
Dynamic Trading Chart - One can create a reliable 10-second chart. Append the data to a .txt file and dynamically import the data. Here is an example:
s = "SPY";
name = s <> "(" <> DateString[{"YearShort", "-", "Month", "-", "Day"}] <> ")";
filePath = "C:\\Team_Python&Wolfram\\" <> name <> ".txt"
data = Import[filePath, "CSV"] /. {timestamp_, o_, h_, l_, c_,v_} :>
{FromUnixTime[timestamp], {o, h, l, c, v}};
DynamicModule[
{currentData = data},
Dynamic[
currentData =
Import[filePath,
"CSV"] /. {timestamp_, o_, h_, l_, c_,
v_} :> {FromUnixTime[timestamp], {o, h, l, c, v}};
TradingChart[currentData],
UpdateInterval -> 10]]
Backtesting - This is where being a WL enthusiast has paid off. The time series functionality surrounding TradingChart
and associated FinancialIndicators
has been quite useful in developing and testing strategies. IMHO, Mathematica is way out in front of all the other programming options when it comes to actually doing the analysis.