Ever wondered how to capture DeFi events as they happen on Solana? In this hands-on tutorial, you’ll build a real-time scraper that listens to the blockchain via WebSocket and decodes events from major protocols like Jupiter, Pump.fun, and Raydium.
By the end of this guide, you’ll have a working Python application that prints live DeFi events to your console, perfect for building trading bots, analytics dashboards, or simply understanding how on-chain activity works. If you want to jump straight to code, check the repository solana-defi-scraper.

What We’re Building
Let’s start by understanding exactly what our scraper will do:
- Connect to Solana’s mainnet via WebSocket
- Filter logs from specific DeFi protocols (Jupiter V6, Pump.fun, Raydium V4)
- Extract base64-encoded event data from transaction logs
- Decode raw bytes into readable Python objects
- Display real-time events like swaps, trades, and pool creations
Here’s a preview of what you’ll see running:
Starting Solana DeFi Scraper...
Subscribed to Jupiter logs...
Subscribed to Pump.fun logs...
Subscribed to Raydium V4 logs...
=== PUMP TRADE EVENT ===
{ "mint": "7GCihgDB8fe6KNjn2MYtkzZcRjQy3t9GHdC8uHYmW2hr", "sol_amount": 50000000, "token_amount": 12500000 }
=== JUPITER SWAP EVENT ===
{ "timestamp": 1703123456, "pool": "58oQChx4yWmvKdwLLZzBi4ChoCc2fqCUWBkwMihLYQo2", "amount_in": 1000000 }
Prerequisites & Setup
Before we dive in, make sure you have:
- Python 3.12 installed
- Poetry for dependency management
- Basic understanding of APIs and WebSockets
Step 1: Create Your Project
First, let’s set up our project structure:
mkdir solana-defi-scraper
cd solana-defi-scraper
poetry init --no-interaction
Create this folder structure:
solana-defi-scraper/
├── pyproject.toml
├── main.py
├── README.md
└── src/
├── __init__.py
├── constants.py
├── wss.py
├── event_processor.py
├── jupiter_layout.py
├── pump_layout.py
└── raydium_layout.py
Step 2: Configure Dependencies
Update your pyproject.toml
with the required packages:
[tool.poetry]
name = "solana-defi-scraper"
version = "0.1.0"
description = "Real-time Solana DeFi event scraper"
authors = ["Your Name <your.email@example.com>"]
readme = "README.md"
packages = [{ include = "src" }]
[tool.poetry.dependencies]
python = "^3.12"
websocket-client = "^1.6.4"
construct = "^2.10.68"
solders = "^0.21.0"
[tool.poetry.scripts]
solana-defi-scraper = "main:main"
Install the dependencies:
poetry install
Building the Core Components
Step 3: Define Protocol Constants
Let’s start by creating our constants file. This centralizes all the important identifiers we’ll need:
Create src/constants.py
:
# src/constants.py
"""
Solana Program IDs and endpoints for DeFi protocols
"""
# Program IDs - these are the unique identifiers for each protocol on Solana
JUPITER_PROGRAM_ID = "JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4"
PUMP_FUN_PROGRAM_ID = "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P"
RAYDIUM_V4_PROGRAM_ID = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"
# Solana WebSocket endpoint for real-time data
WSS_ENDPOINT = "wss://api.mainnet-beta.solana.com/"
Why these Program IDs matter: Every Solana program has a unique identifier. When transactions interact with these protocols, logs are tagged with these IDs, allowing us to filter for specific DeFi activity.
Step 4: Create the Main Entry Point
Create main.py
:
# main.py
import sys
import os
# Add src to Python path so our imports work
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from src.wss import start_websocket
def main():
"""Main entry point for the Solana DeFi scraper"""
print("Starting Solana DeFi Scraper...")
print("Monitoring Jupiter, Pump.fun, and Raydium protocols...")
print("Press Ctrl+C to stop")
print("-" * 50)
try:
start_websocket()
except KeyboardInterrupt:
print("\nShutdown requested by user. Goodbye!")
sys.exit(0)
except Exception as e:
print(f"Unexpected error in main: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Understanding Solana Event Decoding
Before we write the decoding logic, let’s understand what we’re working with:
Solana logs contain base64-encoded data that looks like this:
Program data: SGVsbG8gV29ybGQ=
Our job is to:
- Extract this base64 string
- Decode it to raw bytes
- Parse the bytes according to each protocol’s structure
- Convert it to readable Python objects
Step 5: Create Protocol Layout Decoders
Each DeFi protocol structures its data differently. We’ll use the construct
library to define these structures.
Create src/jupiter_layout.py
:
# src/jupiter_layout.py
from construct import *
from dataclasses import dataclass
from typing import Optional
import base64
@dataclass
class JupiterSwapEvent:
"""Represents a Jupiter swap event"""
timestamp: int
pool: str
amount_in: int
amount_out: int
user: str
# Define the binary layout for Jupiter swap events
jupiter_swap_layout = Struct(
"timestamp" / Int64ul,
"pool" / Bytes(32),
"amount_in" / Int64ul,
"amount_out" / Int64ul,
"user" / Bytes(32),
)
def decode_jupiter_swap(data: bytes) -> Optional[JupiterSwapEvent]:
"""Decode Jupiter swap event from raw bytes"""
try:
parsed = jupiter_swap_layout.parse(data)
return JupiterSwapEvent(
timestamp=parsed.timestamp,
pool=base64.b58encode(parsed.pool).decode(),
amount_in=parsed.amount_in,
amount_out=parsed.amount_out,
user=base64.b58encode(parsed.user).decode()
)
except Exception as e:
print(f"Failed to decode Jupiter swap: {e}")
return None
Try this yourself: Create similar files for pump_layout.py
and raydium_layout.py
following the same pattern. Each protocol will have different field names and structures.
Step 6: Build the Event Processor
The event processor is the brain of our scraper. It takes raw logs and turns them into structured events.
Create src/event_processor.py
:
# src/event_processor.py
import re
import base64
from typing import Optional, Dict, Any
from .constants import JUPITER_PROGRAM_ID, PUMP_FUN_PROGRAM_ID, RAYDIUM_V4_PROGRAM_ID
from .jupiter_layout import decode_jupiter_swap
# Import other decoders as you create them
class EventProcessor:
"""Processes Solana logs and extracts DeFi events"""
def __init__(self):
# Pattern to match "Program data: <base64>" in logs
self.program_data_pattern = re.compile(r'Program data: ([A-Za-z0-9+/=]+)')
def process_log_entry(self, log_entry: Dict[str, Any]) -> None:
"""Process a single log entry from the WebSocket"""
try:
# Extract logs array from the WebSocket message
logs = log_entry.get('logs', [])
# Determine which protocol this log belongs to
protocol = self._identify_protocol(logs)
if not protocol:
return
# Extract and decode program data
program_data = self._extract_program_data(logs)
if not program_data:
return
# Route to appropriate decoder based on protocol
event = self._decode_event(protocol, program_data)
if event:
self._print_event(protocol, event)
except Exception as e:
print(f"Error processing log entry: {e}")
def _identify_protocol(self, logs: list) -> Optional[str]:
"""Identify which DeFi protocol generated these logs"""
log_text = ' '.join(logs)
if JUPITER_PROGRAM_ID in log_text:
return 'jupiter'
elif PUMP_FUN_PROGRAM_ID in log_text:
return 'pump'
elif RAYDIUM_V4_PROGRAM_ID in log_text:
return 'raydium'
return None
def _extract_program_data(self, logs: list) -> Optional[bytes]:
"""Extract base64 program data from logs"""
for log in logs:
match = self.program_data_pattern.search(log)
if match:
try:
return base64.b64decode(match.group(1))
except Exception as e:
print(f"Failed to decode base64: {e}")
return None
def _decode_event(self, protocol: str, data: bytes) -> Optional[Any]:
"""Decode event data based on protocol"""
try:
if protocol == 'jupiter':
return decode_jupiter_swap(data)
# Add other protocol decoders here
# elif protocol == 'pump':
# return decode_pump_trade(data)
# elif protocol == 'raydium':
# return decode_raydium_swap(data)
except Exception as e:
print(f"Failed to decode {protocol} event: {e}")
return None
def _print_event(self, protocol: str, event: Any) -> None:
"""Print formatted event to console"""
protocol_names = {
'jupiter': 'JUPITER SWAP EVENT',
'pump': 'PUMP TRADE EVENT',
'raydium': 'RAYDIUM SWAP EVENT'
}
print(f"\n=== {protocol_names.get(protocol, 'UNKNOWN')} ===")
print(f"{event}")
print("-" * 50)
Step 7: Create the WebSocket Connection
Now for the exciting part, connecting to Solana’s live data stream!
Create src/wss.py
:
# src/wss.py
import json
import time
import websocket
from typing import Dict, Any
from .constants import WSS_ENDPOINT, JUPITER_PROGRAM_ID, PUMP_FUN_PROGRAM_ID, RAYDIUM_V4_PROGRAM_ID
from .event_processor import EventProcessor
class SolanaWebSocketClient:
"""WebSocket client for Solana log subscriptions"""
def __init__(self):
self.ws = None
self.event_processor = EventProcessor()
self.subscription_ids = []
def on_message(self, ws, message: str):
"""Handle incoming WebSocket messages"""
try:
data = json.loads(message)
# Check if this is a log notification
if 'params' in data and 'result' in data['params']:
log_entry = data['params']['result']['value']
self.event_processor.process_log_entry(log_entry)
except Exception as e:
print(f"Error processing message: {e}")
def on_error(self, ws, error):
"""Handle WebSocket errors"""
print(f"WebSocket error: {error}")
def on_close(self, ws, close_status_code, close_msg):
"""Handle WebSocket close"""
print("WebSocket connection closed")
def on_open(self, ws):
"""Handle WebSocket open - subscribe to logs"""
print("WebSocket connection opened")
# Subscribe to logs from each protocol
protocols = [
("Jupiter", JUPITER_PROGRAM_ID),
("Pump.fun", PUMP_FUN_PROGRAM_ID),
("Raydium V4", RAYDIUM_V4_PROGRAM_ID)
]
for name, program_id in protocols:
subscription = {
"jsonrpc": "2.0",
"id": len(self.subscription_ids) + 1,
"method": "logsSubscribe",
"params": [
{"mentions": [program_id]},
{"commitment": "finalized"}
]
}
ws.send(json.dumps(subscription))
print(f"Subscribed to {name} logs...")
def start(self):
"""Start the WebSocket connection with reconnection logic"""
while True:
try:
print(f"Connecting to {WSS_ENDPOINT}...")
self.ws = websocket.WebSocketApp(
WSS_ENDPOINT,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
self.ws.run_forever()
except Exception as e:
print(f"Connection failed: {e}")
print("Reconnecting in 5 seconds...")
time.sleep(5)
def start_websocket():
"""Entry point to start the WebSocket client"""
client = SolanaWebSocketClient()
client.start()
Testing Your Scraper
Step 8: Run Your First Test
Let’s see if everything works:
poetry run solana-defi-scraper
You should see:
Starting Solana DeFi Scraper...
Monitoring Jupiter, Pump.fun, and Raydium protocols...
Press Ctrl+C to stop
--------------------------------------------------
Connecting to wss://api.mainnet-beta.solana.com/...
WebSocket connection opened
Subscribed to Jupiter logs...
Subscribed to Pump.fun logs...
Subscribed to Raydium V4 logs...
Troubleshooting tips:
- If you see connection errors, check your internet connection
- If no events appear, that’s normal. DeFi events happen sporadically
- If you see decode errors, the protocol layouts might need adjustment
Step 9: Understanding the Data Flow
Here’s what happens when a DeFi transaction occurs:
- User executes a swap on Jupiter/Pump.fun/Raydium
- Solana processes the transaction and emits logs
- Our WebSocket receives the logs in real-time
- EventProcessor identifies which protocol the logs belong to
- Layout decoder extracts structured data from base64 payload
- Console displays the decoded event
Congratulations!
You’ve built a real-time Solana DeFi event scraper that connects to Solana’s live data stream via WebSocket, filters events from major DeFi protocols, and decodes raw blockchain data into readable Python objects, laying the foundation for more complex DeFi applications. As next steps, you can enhance it by building a trading bot that reacts to specific events, creating a dashboard to visualize DeFi activity, enriching events with token prices from APIs, implementing webhooks to notify other systems, and scaling with async processing for higher throughput.
Happy scraping!