Poker Training - GTO

Build a Real-Time Solana DeFi Scraper with Python

Solana Defi Scraper - Jupiter, Raydium and Pump.fun
Spread the love

Ever wondered how to capture DeFi events as they happen on Solana? In this hands-on tutorial, you’ll build a real-time scraper that listens to the blockchain via WebSocket and decodes events from major protocols like Jupiter, Pump.fun, and Raydium.

By the end of this guide, you’ll have a working Python application that prints live DeFi events to your console, perfect for building trading bots, analytics dashboards, or simply understanding how on-chain activity works. If you want to jump straight to code, check the repository solana-defi-scraper.

Poker Training - GTO

What We’re Building

Let’s start by understanding exactly what our scraper will do:

  • Connect to Solana’s mainnet via WebSocket
  • Filter logs from specific DeFi protocols (Jupiter V6, Pump.fun, Raydium V4)
  • Extract base64-encoded event data from transaction logs
  • Decode raw bytes into readable Python objects
  • Display real-time events like swaps, trades, and pool creations

Here’s a preview of what you’ll see running:

Starting Solana DeFi Scraper...
Subscribed to Jupiter logs...
Subscribed to Pump.fun logs...
Subscribed to Raydium V4 logs...
=== PUMP TRADE EVENT ===
{ "mint": "7GCihgDB8fe6KNjn2MYtkzZcRjQy3t9GHdC8uHYmW2hr", "sol_amount": 50000000, "token_amount": 12500000 }
=== JUPITER SWAP EVENT ===
{ "timestamp": 1703123456, "pool": "58oQChx4yWmvKdwLLZzBi4ChoCc2fqCUWBkwMihLYQo2", "amount_in": 1000000 }

Prerequisites & Setup

Before we dive in, make sure you have:

  • Python 3.12 installed
  • Poetry for dependency management
  • Basic understanding of APIs and WebSockets

Step 1: Create Your Project

First, let’s set up our project structure:

mkdir solana-defi-scraper
cd solana-defi-scraper
poetry init --no-interaction

Create this folder structure:

solana-defi-scraper/
├── pyproject.toml
├── main.py
├── README.md
└── src/
    ├── __init__.py
    ├── constants.py
    ├── wss.py
    ├── event_processor.py
    ├── jupiter_layout.py
    ├── pump_layout.py
    └── raydium_layout.py

Step 2: Configure Dependencies

Update your pyproject.toml with the required packages:

[tool.poetry]
name = "solana-defi-scraper"
version = "0.1.0"
description = "Real-time Solana DeFi event scraper"
authors = ["Your Name <your.email@example.com>"]
readme = "README.md"
packages = [{ include = "src" }]

[tool.poetry.dependencies]
python = "^3.12"
websocket-client = "^1.6.4"
construct = "^2.10.68"
solders = "^0.21.0"

[tool.poetry.scripts]
solana-defi-scraper = "main:main"

Install the dependencies:

poetry install

Building the Core Components

Step 3: Define Protocol Constants

Let’s start by creating our constants file. This centralizes all the important identifiers we’ll need:

Create src/constants.py:

# src/constants.py
"""
Solana Program IDs and endpoints for DeFi protocols
"""

# Program IDs - these are the unique identifiers for each protocol on Solana
JUPITER_PROGRAM_ID = "JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4"
PUMP_FUN_PROGRAM_ID = "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P"
RAYDIUM_V4_PROGRAM_ID = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"

# Solana WebSocket endpoint for real-time data
WSS_ENDPOINT = "wss://api.mainnet-beta.solana.com/"

Why these Program IDs matter: Every Solana program has a unique identifier. When transactions interact with these protocols, logs are tagged with these IDs, allowing us to filter for specific DeFi activity.

Step 4: Create the Main Entry Point

Create main.py:

# main.py
import sys
import os

# Add src to Python path so our imports work
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))

from src.wss import start_websocket


def main():
    """Main entry point for the Solana DeFi scraper"""
    print("Starting Solana DeFi Scraper...")
    print("Monitoring Jupiter, Pump.fun, and Raydium protocols...")
    print("Press Ctrl+C to stop")
    print("-" * 50)

    try:
        start_websocket()
    except KeyboardInterrupt:
        print("\nShutdown requested by user. Goodbye!")
        sys.exit(0)
    except Exception as e:
        print(f"Unexpected error in main: {e}")
        sys.exit(1)


if __name__ == "__main__":
    main()

Understanding Solana Event Decoding

Before we write the decoding logic, let’s understand what we’re working with:

Solana logs contain base64-encoded data that looks like this:

Program data: SGVsbG8gV29ybGQ=

Our job is to:

  1. Extract this base64 string
  2. Decode it to raw bytes
  3. Parse the bytes according to each protocol’s structure
  4. Convert it to readable Python objects

Step 5: Create Protocol Layout Decoders

Each DeFi protocol structures its data differently. We’ll use the construct library to define these structures.

Create src/jupiter_layout.py:

# src/jupiter_layout.py
from construct import *
from dataclasses import dataclass
from typing import Optional
import base64

@dataclass
class JupiterSwapEvent:
    """Represents a Jupiter swap event"""
    timestamp: int
    pool: str
    amount_in: int
    amount_out: int
    user: str

# Define the binary layout for Jupiter swap events
jupiter_swap_layout = Struct(
    "timestamp" / Int64ul,
    "pool" / Bytes(32),
    "amount_in" / Int64ul,
    "amount_out" / Int64ul,
    "user" / Bytes(32),
)

def decode_jupiter_swap(data: bytes) -> Optional[JupiterSwapEvent]:
    """Decode Jupiter swap event from raw bytes"""
    try:
        parsed = jupiter_swap_layout.parse(data)
        return JupiterSwapEvent(
            timestamp=parsed.timestamp,
            pool=base64.b58encode(parsed.pool).decode(),
            amount_in=parsed.amount_in,
            amount_out=parsed.amount_out,
            user=base64.b58encode(parsed.user).decode()
        )
    except Exception as e:
        print(f"Failed to decode Jupiter swap: {e}")
        return None

Try this yourself: Create similar files for pump_layout.py and raydium_layout.py following the same pattern. Each protocol will have different field names and structures.

Step 6: Build the Event Processor

The event processor is the brain of our scraper. It takes raw logs and turns them into structured events.

Create src/event_processor.py:

# src/event_processor.py
import re
import base64
from typing import Optional, Dict, Any
from .constants import JUPITER_PROGRAM_ID, PUMP_FUN_PROGRAM_ID, RAYDIUM_V4_PROGRAM_ID
from .jupiter_layout import decode_jupiter_swap
# Import other decoders as you create them

class EventProcessor:
    """Processes Solana logs and extracts DeFi events"""
    
    def __init__(self):
        # Pattern to match "Program data: <base64>" in logs
        self.program_data_pattern = re.compile(r'Program data: ([A-Za-z0-9+/=]+)')
    
    def process_log_entry(self, log_entry: Dict[str, Any]) -> None:
        """Process a single log entry from the WebSocket"""
        try:
            # Extract logs array from the WebSocket message
            logs = log_entry.get('logs', [])
            
            # Determine which protocol this log belongs to
            protocol = self._identify_protocol(logs)
            if not protocol:
                return
            
            # Extract and decode program data
            program_data = self._extract_program_data(logs)
            if not program_data:
                return
            
            # Route to appropriate decoder based on protocol
            event = self._decode_event(protocol, program_data)
            if event:
                self._print_event(protocol, event)
                
        except Exception as e:
            print(f"Error processing log entry: {e}")
    
    def _identify_protocol(self, logs: list) -> Optional[str]:
        """Identify which DeFi protocol generated these logs"""
        log_text = ' '.join(logs)
        
        if JUPITER_PROGRAM_ID in log_text:
            return 'jupiter'
        elif PUMP_FUN_PROGRAM_ID in log_text:
            return 'pump'
        elif RAYDIUM_V4_PROGRAM_ID in log_text:
            return 'raydium'
        
        return None
    
    def _extract_program_data(self, logs: list) -> Optional[bytes]:
        """Extract base64 program data from logs"""
        for log in logs:
            match = self.program_data_pattern.search(log)
            if match:
                try:
                    return base64.b64decode(match.group(1))
                except Exception as e:
                    print(f"Failed to decode base64: {e}")
        return None
    
    def _decode_event(self, protocol: str, data: bytes) -> Optional[Any]:
        """Decode event data based on protocol"""
        try:
            if protocol == 'jupiter':
                return decode_jupiter_swap(data)
            # Add other protocol decoders here
            # elif protocol == 'pump':
            #     return decode_pump_trade(data)
            # elif protocol == 'raydium':
            #     return decode_raydium_swap(data)
        except Exception as e:
            print(f"Failed to decode {protocol} event: {e}")
        
        return None
    
    def _print_event(self, protocol: str, event: Any) -> None:
        """Print formatted event to console"""
        protocol_names = {
            'jupiter': 'JUPITER SWAP EVENT',
            'pump': 'PUMP TRADE EVENT',
            'raydium': 'RAYDIUM SWAP EVENT'
        }
        
        print(f"\n=== {protocol_names.get(protocol, 'UNKNOWN')} ===")
        print(f"{event}")
        print("-" * 50)

Step 7: Create the WebSocket Connection

Now for the exciting part, connecting to Solana’s live data stream!

Create src/wss.py:

# src/wss.py
import json
import time
import websocket
from typing import Dict, Any
from .constants import WSS_ENDPOINT, JUPITER_PROGRAM_ID, PUMP_FUN_PROGRAM_ID, RAYDIUM_V4_PROGRAM_ID
from .event_processor import EventProcessor

class SolanaWebSocketClient:
    """WebSocket client for Solana log subscriptions"""
    
    def __init__(self):
        self.ws = None
        self.event_processor = EventProcessor()
        self.subscription_ids = []
    
    def on_message(self, ws, message: str):
        """Handle incoming WebSocket messages"""
        try:
            data = json.loads(message)
            
            # Check if this is a log notification
            if 'params' in data and 'result' in data['params']:
                log_entry = data['params']['result']['value']
                self.event_processor.process_log_entry(log_entry)
                
        except Exception as e:
            print(f"Error processing message: {e}")
    
    def on_error(self, ws, error):
        """Handle WebSocket errors"""
        print(f"WebSocket error: {error}")
    
    def on_close(self, ws, close_status_code, close_msg):
        """Handle WebSocket close"""
        print("WebSocket connection closed")
    
    def on_open(self, ws):
        """Handle WebSocket open - subscribe to logs"""
        print("WebSocket connection opened")
        
        # Subscribe to logs from each protocol
        protocols = [
            ("Jupiter", JUPITER_PROGRAM_ID),
            ("Pump.fun", PUMP_FUN_PROGRAM_ID),
            ("Raydium V4", RAYDIUM_V4_PROGRAM_ID)
        ]
        
        for name, program_id in protocols:
            subscription = {
                "jsonrpc": "2.0",
                "id": len(self.subscription_ids) + 1,
                "method": "logsSubscribe",
                "params": [
                    {"mentions": [program_id]},
                    {"commitment": "finalized"}
                ]
            }
            
            ws.send(json.dumps(subscription))
            print(f"Subscribed to {name} logs...")
    
    def start(self):
        """Start the WebSocket connection with reconnection logic"""
        while True:
            try:
                print(f"Connecting to {WSS_ENDPOINT}...")
                
                self.ws = websocket.WebSocketApp(
                    WSS_ENDPOINT,
                    on_message=self.on_message,
                    on_error=self.on_error,
                    on_close=self.on_close,
                    on_open=self.on_open
                )
                
                self.ws.run_forever()
                
            except Exception as e:
                print(f"Connection failed: {e}")
                print("Reconnecting in 5 seconds...")
                time.sleep(5)

def start_websocket():
    """Entry point to start the WebSocket client"""
    client = SolanaWebSocketClient()
    client.start()

Testing Your Scraper

Step 8: Run Your First Test

Let’s see if everything works:

poetry run solana-defi-scraper

You should see:

Starting Solana DeFi Scraper...
Monitoring Jupiter, Pump.fun, and Raydium protocols...
Press Ctrl+C to stop
--------------------------------------------------
Connecting to wss://api.mainnet-beta.solana.com/...
WebSocket connection opened
Subscribed to Jupiter logs...
Subscribed to Pump.fun logs...
Subscribed to Raydium V4 logs...

Troubleshooting tips:

  • If you see connection errors, check your internet connection
  • If no events appear, that’s normal. DeFi events happen sporadically
  • If you see decode errors, the protocol layouts might need adjustment

Step 9: Understanding the Data Flow

Here’s what happens when a DeFi transaction occurs:

  1. User executes a swap on Jupiter/Pump.fun/Raydium
  2. Solana processes the transaction and emits logs
  3. Our WebSocket receives the logs in real-time
  4. EventProcessor identifies which protocol the logs belong to
  5. Layout decoder extracts structured data from base64 payload
  6. Console displays the decoded event


Congratulations!

You’ve built a real-time Solana DeFi event scraper that connects to Solana’s live data stream via WebSocket, filters events from major DeFi protocols, and decodes raw blockchain data into readable Python objects, laying the foundation for more complex DeFi applications. As next steps, you can enhance it by building a trading bot that reacts to specific events, creating a dashboard to visualize DeFi activity, enriching events with token prices from APIs, implementing webhooks to notify other systems, and scaling with async processing for higher throughput.

Happy scraping!


Spread the love