Message Boards Message Boards

Real-time financial data analysis with Python/Wolfram Language hybrid – overcoming blocking issues

Background Information:

I'm in the process of building a workflow for real-time financial data analysis using the Wolfram Language on a budget. Due to the lack of native websocket support in Mathematica, I've decided to create a hybrid system that leverages Python for real-time data acquisition and Mathematica for analysis and processing.

After exploring various options for market data (alpaca, polygon.io, Alpha Vantage, Schwab, etc.), I settled on using the Schwab Developer API with data updates down to the second. This level of detail is crucial for my analysis needs, and it's been the most suitable option I've found without requiring expensive subscriptions.

The Challenge:

My main problem arises when integrating the real-time data feed with Mathematica. I am using SessionSubmit to run asynchronous tasks in Mathematica that receive data from Python via ZeroMQ. The data is stored in a list (dataList) as it arrives. However, when I attempt to access or display dataList in Mathematica (by typing dataList and pressing Shift+Enter), the notebook blocks and waits for the next data update from the websocket.

I want to be able to access and work with the stored data immediately, without any blocking behavior. Ideally, when I request the data, I should get the current state of dataList instantly, without waiting for the next data update.

What I’ve Tried:

I initially used CreateScheduledTask, but then switched to SessionSubmit for more flexibility. Despite this, the blocking issue persists. I've tried using Dynamic to display dataList, but the front end still waits for the next scheduled task to execute before showing the data. I suspect the issue may be related to how Mathematica handles the task and data synchronization, but I haven't found a way to resolve it.

My Goal:

I currently have data streaming into the WL notebook in real-time. I need a method to immediately access and display the current contents of dataList, without waiting for the next websocket activity or encountering any blocking behavior. The websocket server is written in Python with ~20 lines of code. I could post it if someone needs. Here is my Mathematica code:

subscribe = SocketConnect[{"127.0.0.1", 63217}, {"ZMQ", "SUB"}];
SetOptions[subscribe, "ZMQ_SUBSCRIBE" -> ""];
dataList = {};
receiveMessage[socket_] := Module[{msg, data},msg = SocketReadMessage[socket];
If[msg === $Failed,Nothing,data =ImportString[ByteArrayToString[msg], "JSON"];
AppendTo[dataList, data];data]];
task = SessionSubmit[ScheduledTask[Module[{data},
data = receiveMessage[subscribe];
Print["Received Data: ", data];],1]];

(* asyncTask = 
CreateScheduledTask[Module[{data}, 
data = receiveMessage[subscribe]; Print[data]],1];
StartScheduledTask[asyncTask];*)

The socket is bound to the local host on port 63217 in Python. Mathematica connects to the same port and receives data continuously. The recieveMessage function converts the incoming messages to strings and appends them to dataList. When I ask for dataList, that is where a significant delay takes place. SessionSubmit completely blocks and CreateScheduledTask waits for the update.

Python to Wolfram Data Bridge

import Sconfig,schwabdev,zmq,json
from datetime import datetime

stock_symbol = "SPY"
current_date = datetime.now().strftime("%y-%m-%d")

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:63217")

client = schwabdev.Client(Sconfig.appKey, Sconfig.appSecret)

def my_handler(message, another_var):    

    print(message)
    socket.send_string(json.dumps(message))

streamer = client.stream
streamer.start(my_handler, "test")
streamer.send(streamer.level_one_equities(stock_symbol, 
"0,1,2,3,4,5,9"))

This program does the following

  • Connects to Schwab's Market Data API using the Python library(scwabdev) built by Tylere Bowers and my credentials.
  • Creates a local ZMQ socket connection bound to port 63217.
  • Sends a JSON string through the ZMQ socket every time a message is received from the market data API.

The data I'm sending in this example is for the S&P500 index "SPY" and contains the symbol, bid price, ask price, last price, bid size, ask size, and last size.

Help:

This workflow will be incredibly useful for many people if Mathematica can act on that incoming data in real-time. I am looking to fully decouple my working with dataList from the data accumulation effort. If I discover a solution, I'll post it here.

Update: Consider the following non-exhaustive table with GPT4o's opinion of market data providers: GPT4o Market Data Rankings

Through the Wolfram App, one can access a robust collection of fundamental and technical data with daily granularity. Doing analysis and decision making on a daily(or longer) time scale can be done seamlessly with either the entity framework or FinancialData and associated functions. However, analysis with data on time scales < 1 day require the Wolfram Finance Platform and a subscription to one of the first two options in the table. This is not a solution that will work for the average retail investor/trader. $25k/yr wouldn't be so bad if you were making millions but alas, most of us are not quite there yet. Here is one example of an alternative solution I've found to be useful. Solutions require market data from some external source and if you are not doing high frequency(sub-second) micro-transactions, many free options will get the job done. Also, streaming data will require websockets. My choice has been to use Charles Schwab's market data API via Python.

  • Dynamic Trading Chart - One can create a reliable 10-second chart. Append the data to a .txt file and dynamically import the data. Here is an example:

    s = "SPY";
    name = s <> "(" <> DateString[{"YearShort", "-", "Month", "-", "Day"}] <> ")";
    filePath = "C:\\Team_Python&Wolfram\\" <> name <> ".txt"
    
    data = Import[filePath, "CSV"] /. {timestamp_, o_, h_, l_, c_,v_} :> 
    {FromUnixTime[timestamp], {o, h, l, c, v}};
    
    DynamicModule[
     {currentData = data},
     Dynamic[
      currentData = 
       Import[filePath, 
         "CSV"] /. {timestamp_, o_, h_, l_, c_, 
          v_} :> {FromUnixTime[timestamp], {o, h, l, c, v}};
      TradingChart[currentData], 
      UpdateInterval -> 10]]
    
  • Backtesting - This is where being a WL enthusiast has paid off. The time series functionality surrounding TradingChart and associated FinancialIndicators has been quite useful in developing and testing strategies. IMHO, Mathematica is way out in front of all the other programming options when it comes to actually doing the analysis.

POSTED BY: John Martin
2 Replies
Posted 2 months ago

Ain't it the truth. When it comes to stuff like this retail non-professionals like myself play 5th fiddle when it comes to getting live data into the box to feed MMA apps. And I totally agree there is nothing on this planet remotely as good as Mathematica so we soldier on. But there are workarounds which don't involve a live feed into MMA. I trade option spread strategies at Schwab with TradingView and ThinkOrSwim using their live data. But these would be useless without Mathematica using polygon.io data where a static view of spreads slows down the process allowing me to orient myself relative to the insanity on ThinkOrSwim and TradingView. Nobody says building this stuff is easy but its very satisfying and has afforded some success. I attach an image only to show what is possible with MMA and to encourage perserveance. enter image description here

POSTED BY: Jonathan Miller
Posted 2 months ago

John,

I am interested in the websocket server is written in Python as I want to see if it can be adapted to Interactive Broker.

Regards

Alan

POSTED BY: Alan Mok
Reply to this discussion
Community posts can be styled and formatted using the Markdown syntax.
Reply Preview
Attachments
Remove
or Discard

Group Abstract Group Abstract