X402 Helio Pay x Browser Use

This documentation outlines how to integrate the X402 protocol with Helio Pay (formerly Solana Pay) and Browser-Use API to create autonomous AI-driven payment solutions.

Contents

Overview

This integration enables AI agents to autonomously engage with e-commerce platforms, make payments, and interact with web interfaces while leveraging the speed and efficiency of the Solana blockchain.

X402 Protocol

What is X402?

X402 is an AI-agentic payment protocol built on HTTP 402, designed to be the new standard for internet-native payments. Developed initially by Coinbase, it enables instant stablecoin payments directly through HTTP, creating a frictionless payment infrastructure for AI agents, apps, and APIs.

Key Features

  • AI-Agentic Focus: Designed for AI agents to autonomously pay for digital services

  • HTTP-Native: Works with standard HTTP infrastructure without specialized setup

  • Stablecoin-Based: Uses stablecoins for payments, ensuring price stability

  • Frictionless Transactions: Eliminates account creation, API keys, and KYC requirements

  • Pay-Per-Use Model: Enables paying only for resources actually consumed

How X402 Works

  1. An AI agent requests access to a resource via HTTP

  2. The server responds with a 402 "Payment Required" status and price details

  3. The agent makes payment using stablecoins via HTTP headers

  4. A payment facilitator verifies and settles the payment

  5. The server grants access to the requested resource

Helio Pay Integration

Helio Pay (the evolution of Solana Pay) provides a seamless way to integrate cryptocurrency payments with e-commerce platforms like Shopify. This section covers the key features of Helio Pay that enable X402 protocol implementation.

Setting Up Webhooks for Shopify

To receive notifications about payments made via Helio Pay on your Shopify store:

Prerequisites

  1. Log in to the Shopify Merchant Dashboard

  2. Retrieve your API Keys

  3. Obtain your Shopify Pay Link ID (found at the end of the URL of your Shopify Pay Link)

Webhook Payload Example

{
  "event": "CREATED",
  "transactionObject": {
    "id": "67d0163484e9de99088b8c05",
    "paylinkId": "66b3ae45d561f776b94873f0",
    "fee": "200",
    "quantity": 1,
    "createdAt": "2025-03-11T10:53:40.016Z",
    "paymentType": "PAYLINK",
    "meta": {
      "id": "67d0163484e9de99088b8c03",
      "amount": "19800",
      "senderPK": "89ZFMYrbVFA8vwuw1wzPHQBHYCJsPuMpE9izfcKeb5Qp",
      "recipientPK": "Bd1EQQasAcpudCWYXZVL2tSjktJkCJVqPU2Nd6VNrEEV",
      "transactionType": "PAYLINK",
      "customerDetails": {},
      "productDetails": null,
      "additionalProductDetails": [],
      "transactionSignature": "3RXXiJxRjVE3ypdEzWvcNW2ASb3gc1C4Vc4U2KJJwcrzPjKtM5yep1Gmz4vDhutf229m3ApZtb9sgrMhLYMYDQYr",
      "transactionStatus": "SUCCESS",
      "splitRevenue": false,
      "remainingAccounts": [],
      "totalAmount": "19800",
      "affiliateAmount": "0",
      "tokenQuote": {
        "from": "USDC",
        "fromAmountDecimal": "0.02",
        "to": "USDC",
        "toAmountMinimal": "20000"
      },
      "shopifyPaymentDetails": {
        "shopifyPaymentGid": "gid://shopify/PaymentSession/rVVb9k5vukGQollX60oEUQi0L",
        "shopifyPaymentId": "rVVb9k5vukGQollX60oEUQi0L"
      },
      "submitGeolocation": "GB",
      "currency": {
        "id": "6340313846e4f91b8abc519b",
        "blockchain": null
      }
    }
  }
}

Creating Charges via API

To programmatically create charges for Pay Links (crucial for X402 protocol implementation):

Prerequisites

  1. Create a Helio account and generate API keys

  2. Create a Pay Link

API Endpoint and Authentication

  • Endpoint: https://api.hel.io/v1/charge/api-key

  • Method: POST

  • Required Query Parameters: ?apiKey={{Your public key here}}

  • Required Headers: Authorization: Bearer {{Your API Token here}}

Example Request

{
    "paymentRequestId": "667974dbb38d45b726751902",
    "requestAmount": "0.2"
}

Example with Additional Data

You can include additional information via the additionalJSON field:

{
  "paymentRequestId": "66c49e24701f6930ee9c77dd",
  "requestAmount": "0.01",
  "prepareRequestBody": {
    "customerDetails": {
      "additionalJSON": "{\"x402_request_id\":\"abc123\",\"resource_url\":\"https://api.example.com/premium/data\"}"
    }
  }
}

Example Response

{
    "id": "667c375be240df9843f7f4f9",
    "pageUrl": "https://app.hel.io//charge/87e3482c-669b-4c62-ab01-23239889acb4"
}

Example cURL Request

curl --location 'https://api.hel.io/v1/charge/api-key?apiKey=<YOUR_PUBLIC_API_KEY>' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer YOUR_SECRET_API_KEY' \
--data '{
    "paymentRequestId": "667974dbb38d45b726751902",
    "requestAmount": "0.2"
}'

Embedded Widget Integration

You can embed a Helio widget on any website to create checkout flows, ideal for X402 protocol user interfaces:

{
  "paylinkId": "657729a959531071881ed7c5", // Use your own paylink ID!
  "display": "new-tab",
  "theme": {
    "themeMode": "light"
  }
}

For inline charges without redirecting, useful for AI-agent-driven payment flows:

{
  "chargeToken": "54cdf9b3-5793-4046-9ee8-bb62574bc4ae",
  "theme": { "themeMode": "light" },
  "primaryColor": "#FF0000",
  "neutralColor": "#2D6FE1",
  "display": "inline",
}

Subscription Management

X402 can be especially powerful for managing subscriptions to premium services. Helio Pay provides comprehensive subscription management capabilities:

Creating Subscription Pay Links

{
  "template": "SUBSCRIPTION",
  "name": "Premium API Access",
  "price": "1000000", // 1 USDC
  "pricingCurrency": "6340313846e4f91b8abc519b", // USDC currency ID
  "features": {
    "isSubscription": true
  },
  "recipients": [
    {
      "walletId": "YOUR_WALLET_ID",
      "currencyId": "6340313846e4f91b8abc519b"
    }
  ],
  "subscriptionDetails": {
    "renewalReminders": 3,
    "gracePeriod": 2,
    "annualDiscountBps": 1000, // 10% discount for annual subscriptions
    "interval": "MONTH"
  }
}

Querying Subscriptions

You can retrieve all subscriptions or a specific subscription to check status:

# Get all subscriptions
curl --location 'https://api.hel.io/v1/subscriptions?apiKey=YOUR_PUBLIC_API_KEY' \
--header 'Authorization: Bearer YOUR_SECRET_API_KEY'

# Get a specific subscription
curl --location 'https://api.hel.io/v1/subscriptions/67a0d2ea18c700e1cda9f1b8?apiKey=YOUR_PUBLIC_API_KEY' \
--header 'Authorization: Bearer YOUR_SECRET_API_KEY'

This allows X402 agents to verify subscription status before granting access to protected resources.

Transaction Tracking

For X402 implementations, tracking and verifying transactions is crucial. Helio Pay offers comprehensive transaction tracking:

Get All Transactions

curl --location 'https://api.hel.io/v1/export/payments?apiKey=YOUR_PUBLIC_API_KEY' \
--header 'Authorization: Bearer YOUR_SECRET_API_KEY'

Get Transactions for a Specific Pay Link

curl --location 'https://api.hel.io/v1/paylink/PAYLINK_ID/transactions?apiKey=YOUR_PUBLIC_API_KEY&from=2024-01-16T15:30:00.000Z&to=2025-01-16T15:30:00.000Z' \
--header 'Authorization: Bearer YOUR_SECRET_API_KEY'

This lets X402 implementations verify payment status and maintain an audit trail of resource access.

Merchant Account Creation

For platforms implementing X402 protocol for their customers, programmatic merchant account creation is essential:

Create a New Merchant Account

{
  "id": "c81aede4-c247-431f-84b8-16629c983abe",
  "company": {
    "name": "API Provider Inc.",
    "email": "api@provider.com",
    "phoneNumber": "+1234567890",
    "discordUsername": "api-provider",
    "websiteUrl": "https://api.provider.com",
    "kycVerified": true
  },
  "user": {
    "name": "API Admin",
    "email": "admin@api.provider.com"
  },
  "wallets": [
    {
      "name": "API Revenue Wallet",
      "address": "DsUFWNpve7hVQdhWDAfUTGq78SpZZ3tVLdnJiRa54QBU"
    }
  ]
}

This feature requires platform-level API access. Contact hi@hel.io for partnership details.

Redirect with Query Parameters

You can configure redirects after successful payments, useful for completing the X402 payment flow:

{
  "template": "OTHER",
  "name": "API Access Payment",
  "price": "10000",
  "pricingCurrency": "63430c8348c610068bcdc482",
  "features": {
    "shouldRedirectOnSuccess": true,
    "hasRedirectQueryParams": true
  },
  "recipients": [
    {
      "walletId": "YOUR_WALLET_ID",
      "currencyId": "63430c8348c610068bcdc482"
    }
  ],
  "redirectUrl": "https://api.example.com/access/verify",
  "redirectQueryParams": [{"queryParamType": "WALLET_ADDRESS"}]
}

After payment, users will be redirected to: https://api.example.com/access/verify?WALLET_ADDRESS=7YancRyNQyp9s6G7YNwx9H93UqswoKWqF9GuNJPufyvW

This completes the X402 payment flow by returning the user to the resource with verification parameters.


## Browser-Use API Integration

The Browser-Use API enables AI agents to automate web interaction, making it perfect for implementing X402-powered payment flows.

### Basic Implementation

Use the following Python code to create and monitor browser automation tasks:

```python
import json
import time
import requests

API_KEY = 'your_api_key_here'
BASE_URL = 'https://api.browser-use.com/api/v1'
HEADERS = {'Authorization': f'Bearer {API_KEY}'}

def create_task(instructions: str):
    """Create a new browser automation task"""
    response = requests.post(f'{BASE_URL}/run-task', headers=HEADERS, 
                            json={'task': instructions})
    return response.json()['id']

def get_task_details(task_id: str):
    """Get full task details including output"""
    response = requests.get(f'{BASE_URL}/task/{task_id}', headers=HEADERS)
    return response.json()

def wait_for_completion(task_id: str, poll_interval: int = 2):
    """Poll task status until completion"""
    count = 0
    unique_steps = []
    while True:
        details = get_task_details(task_id)
        new_steps = details['steps']
        # use only the new steps that are not in unique_steps.
        if new_steps != unique_steps:
            for step in new_steps:
                if step not in unique_steps:
                    print(json.dumps(step, indent=4))
            unique_steps = new_steps
        count += 1
        status = details['status']

        if status in ['finished', 'failed', 'stopped']:
            return details
        time.sleep(poll_interval)

def main():
    task_id = create_task('Open https://www.google.com and search for openai')
    print(f'Task created with ID: {task_id}')
    task_details = wait_for_completion(task_id)
    print(f"Final output: {task_details['output']}")

if __name__ == '__main__':
    main()

Task Control Example

def control_task():
    # Create a new task
    task_id = create_task("Go to google.com and search for Browser Use")

    # Wait for 5 seconds
    time.sleep(5)

    # Pause the task
    requests.put(f"{BASE_URL}/pause-task?task_id={task_id}", headers=HEADERS)
    print("Task paused! Check the live preview.")

    # Wait for user input
    input("Press Enter to resume...")

    # Resume the task
    requests.put(f"{BASE_URL}/resume-task?task_id={task_id}", headers=HEADERS)

    # Wait for completion
    result = wait_for_completion(task_id)
    print(f"Task completed with output: {result['output']}")

Structured Output Example

import json
import os
import time
import requests
from pydantic import BaseModel
from typing import List

API_KEY = os.getenv("API_KEY")
BASE_URL = 'https://api.browser-use.com/api/v1'
HEADERS = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

# Define output schema using Pydantic
class SocialMediaCompany(BaseModel):
    name: str
    market_cap: float
    headquarters: str
    founded_year: int

class SocialMediaCompanies(BaseModel):
    companies: List[SocialMediaCompany]

def create_structured_task(instructions: str, schema: dict):
    """Create a task that expects structured output"""
    payload = {
        "task": instructions,
        "structured_output_json": json.dumps(schema)
    }
    response = requests.post(f"{BASE_URL}/run-task", headers=HEADERS, json=payload)
    response.raise_for_status()
    return response.json()["id"]

def wait_for_task_completion(task_id: str, poll_interval: int = 5):
    """Poll task status until it completes"""
    while True:
        response = requests.get(f"{BASE_URL}/task/{task_id}/status", headers=HEADERS)
        response.raise_for_status()
        status = response.json()
        if status == "finished":
            break
        elif status in ["failed", "stopped"]:
            raise RuntimeError(f"Task {task_id} ended with status: {status}")
        print("Waiting for task to finish...")
        time.sleep(poll_interval)

def fetch_task_output(task_id: str):
    """Retrieve the final task result"""
    response = requests.get(f"{BASE_URL}/task/{task_id}", headers=HEADERS)
    response.raise_for_status()
    return response.json()["output"]

def main():
    schema = SocialMediaCompanies.model_json_schema()
    task_id = create_structured_task(
        "Get me the top social media companies by market cap",
        schema
    )
    print(f"Task created with ID: {task_id}")

    wait_for_task_completion(task_id)
    print("Task completed!")

    output = fetch_task_output(task_id)
    print("Raw output:", output)

    try:
        parsed = SocialMediaCompanies.model_validate_json(output)
        print("Parsed output:")
        print(parsed)
    except Exception as e:
        print(f"Failed to parse structured output: {e}")

Integration Architecture

Combining X402, Helio Pay, and Browser-Use creates a powerful system for autonomous AI-driven payments:

Architecture Components

  1. X402 Protocol Layer: Enables standardized payment requests and responses

  2. Solana/Helio Integration Layer: Handles rapid, low-cost payment processing

  3. Browser-Use Automation Layer: Provides web interaction capabilities

  4. AI Agent Layer: Orchestrates the entire workflow

Flow Diagram

┌────────────┐          ┌───────────┐          ┌──────────────┐          ┌──────────┐
│            │  HTTP    │           │  HTTP 402 │              │  Payment │          │
│  AI Agent  │ ────────►│  Service  │ ────────►│ X402 Handler │ ────────►│  Helio   │
│            │  Request │           │  Response │              │  Request │          │
└─────┬──────┘          └───────────┘          └──────────────┘          └────┬─────┘
      │                                                                       │
      │                                                                       │
      │                                                                       │
      │                                         ┌──────────────┐              │
      │                                         │              │              │
      │ Browser-Use                 Payment     │   Solana     │   Payment    │
      └────────────────────────────────────────►│  Blockchain  │◄─────────────┘
        Automation                 Confirmation │              │
                                                └──────────────┘

Implementation Guide

This section provides a step-by-step guide to implementing the X402 protocol with Helio Pay and Browser-Use API.

Step 1: Set Up X402 Server Handler

First, create a server component that implements the X402 protocol:

// Example Express.js implementation
const express = require('express');
const app = express();
const PORT = process.env.PORT || 3000;

// Middleware to parse JSON
app.use(express.json());

// Your protected resources
const premiumContent = {
  'research-paper-001': { title: 'Advanced AI Research Paper', content: 'Premium content here...' },
  'dataset-001': { title: 'Large-scale Training Dataset', format: 'CSV', size: '2.3GB' }
};

// Helio Pay configuration
const HELIO_CONFIG = {
  publicKey: process.env.HELIO_PUBLIC_KEY,
  secretKey: process.env.HELIO_SECRET_KEY,
  payLinkId: process.env.HELIO_PAYLINK_ID
};

// Track payments
const paymentsReceived = {};

// X402 route handler
app.get('/api/premium/:resourceId', async (req, res) => {
  const resourceId = req.params.resourceId;
  const resource = premiumContent[resourceId];
  
  // Resource exists check
  if (!resource) {
    return res.status(404).json({ error: 'Resource not found' });
  }
  
  // Check if payment confirmation is in header
  const paymentConfirmation = req.headers['x-payment-confirmation'];
  if (paymentConfirmation && paymentsReceived[paymentConfirmation]) {
    // Payment verified, serve the resource
    return res.json({ resource });
  }
  
  // Create payment details
  const paymentDetails = {
    payment_request_id: HELIO_CONFIG.payLinkId,
    amount: '0.01', // $0.01 USDC
    currency: 'USDC',
    recipient: 'Your Wallet Address',
    description: `Access to ${resource.title}`,
    requires_browser_interaction: true,
    resource_id: resourceId
  };
  
  // Return 402 Payment Required with payment details
  res.status(402)
     .header('X-Payment-Details', JSON.stringify(paymentDetails))
     .json({ 
       message: 'Payment required to access this resource',
       paymentDetails
     });
});

// Payment verification webhook from Helio
app.post('/webhook/payment', (req, res) => {
  const { transactionObject } = req.body;
  
  if (transactionObject && transactionObject.meta.transactionStatus === 'SUCCESS') {
    // Store payment confirmation
    paymentsReceived[transactionObject.id] = {
      amount: transactionObject.meta.amount,
      timestamp: new Date(),
      sender: transactionObject.meta.senderPK
    };
    
    console.log(`Payment received: ${transactionObject.id}`);
  }
  
  res.status(200).send('OK');
});

app.listen(PORT, () => {
  console.log(`X402 server running on port ${PORT}`);
});

Step 2: Integrate Helio Pay

Create Pay Links for Your Resources

First, create the necessary Pay Links that will be used in the X402 flow:

curl --location 'https://api.hel.io/v1/paylink/create/api-key?apiKey=YOUR_PUBLIC_API_KEY' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer YOUR_SECRET_API_KEY' \
--data '{
    "template": "OTHER",
    "name": "X402 API Access",
    "price": "10000", 
    "pricingCurrency": "6340313846e4f91b8abc519b",
    "features": {
        "requireEmail": true
    },
    "recipients": [
        {
            "walletId": "YOUR_WALLET_ID",
            "currencyId": "6340313846e4f91b8abc519b"
        }
    ],
    "description": "One-time access to premium API resources"
}'

Set Up Webhooks for Payment Notifications

Configure a webhook in your Helio Pay dashboard to send notifications to your server:

Webhook URL: https://your-x402-server.com/webhook/payment
Events to Listen For: Transaction Created, Transaction Completed

Implement Charge Creation for Dynamic Pricing

For resources with dynamic pricing based on complexity, size, or other factors:

// Dynamic pricing handler
async function createHelioPurchase(resourceId, amount) {
  try {
    const response = await fetch(`https://api.hel.io/v1/charge/api-key?apiKey=${HELIO_CONFIG.publicKey}`, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${HELIO_CONFIG.secretKey}`
      },
      body: JSON.stringify({
        paymentRequestId: HELIO_CONFIG.payLinkId,
        requestAmount: amount,
        prepareRequestBody: {
          customerDetails: {
            additionalJSON: JSON.stringify({
              resource_id: resourceId,
              request_timestamp: new Date().toISOString()
            })
          }
        }
      })
    });
    
    return await response.json();
  } catch (error) {
    console.error('Error creating charge:', error);
    throw error;
  }
}

Step 3: Set Up Browser-Use API

Create a comprehensive Browser-Use client for automating payment flows:

import os
import json
import requests
import time
from typing import Dict, Any, Optional

class BrowserUseClient:
    """Client for Browser-Use API to automate web interactions"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.browser-use.com/api/v1"
        self.headers = {'Authorization': f'Bearer {api_key}'}
    
    def create_payment_automation(self, payment_url: str, wallet_address: str = None) -> str:
        """Create a task specifically for handling payment flows"""
        instructions = (
            f"Go to {payment_url} and complete the payment process. "
            "Wait for the payment page to load completely. "
            "If there is a 'Connect Wallet' button, click it. "
        )
        
        if wallet_address:
            instructions += f"If asked for a wallet address, use '{wallet_address}'. "
        
        instructions += (
            "Complete any CAPTCHA or verification if presented. "
            "After the payment is completed successfully, "
            "wait for confirmation and extract any transaction ID or confirmation code."
        )
        
        # Define structured output to capture payment results
        payment_schema = {
            "type": "object",
            "properties": {
                "status": {"type": "string", "enum": ["completed", "failed", "pending"]},
                "transaction_id": {"type": "string"},
                "error_message": {"type": "string"},
                "confirmation_page_url": {"type": "string"}
            },
            "required": ["status"]
        }
        
        return self.create_task(instructions, payment_schema)
    
    def create_task(self, instructions: str, structured_output: Dict[str, Any] = None) -> str:
        """Create a new browser automation task"""
        payload = {'task': instructions}
        
        if structured_output:
            payload['structured_output_json'] = json.dumps(structured_output)
        
        response = requests.post(
            f'{self.base_url}/run-task',
            headers=self.headers,
            json=payload
        )
        response.raise_for_status()
        return response.json()['id']
    
    def get_task_details(self, task_id: str) -> Dict[str, Any]:
        """Get full task details including output"""
        response = requests.get(
            f'{self.base_url}/task/{task_id}',
            headers=self.headers
        )
        response.raise_for_status()
        return response.json()
    
    def wait_for_completion(self, task_id: str, poll_interval: int = 2, 
                          timeout: int = 300) -> Dict[str, Any]:
        """Wait for a task to complete with timeout"""
        start_time = time.time()
        unique_steps = []
        
        while time.time() - start_time < timeout:
            details = self.get_task_details(task_id)
            
            # Track progress
            new_steps = details.get('steps', [])
            if new_steps != unique_steps:
                for step in new_steps:
                    if step not in unique_steps:
                        print(f"Step {step.get('step')}: {step.get('next_goal')}")
                unique_steps = new_steps
            
            status = details.get('status')
            if status in ['finished', 'failed', 'stopped']:
                return details
            
            time.sleep(poll_interval)
        
        # Timeout occurred
        self.stop_task(task_id)
        raise TimeoutError(f"Task {task_id} did not complete within {timeout} seconds")
    
    def stop_task(self, task_id: str) -> Dict[str, Any]:
        """Stop a running task"""
        response = requests.put(
            f'{self.base_url}/stop-task?task_id={task_id}',
            headers=self.headers
        )
        response.raise_for_status()
        return response.json()

Step 4: Create X402 Agent

Now, implement a comprehensive X402 agent that ties everything together:

import json
import requests
import os
import time
from typing import Dict, Any, Optional, Union
from pydantic import BaseModel

# Import our clients
from browser_use_client import BrowserUseClient
from helio_pay_client import HelioPayClient

# Define models for structured data
class PaymentDetails(BaseModel):
    payment_request_id: str
    amount: str
    currency: str
    recipient: str
    description: Optional[str] = None
    requires_browser_interaction: bool = False
    resource_id: Optional[str] = None

class PaymentResult(BaseModel):
    status: str  # "success", "failed", "pending"
    transaction_id: Optional[str] = None
    error: Optional[str] = None
    details: Optional[Dict[str, Any]] = None

class Resource(BaseModel):
    url: str
    content_type: str
    data: Any
    payment_required: bool = False
    payment_details: Optional[PaymentDetails] = None

class X402Agent:
    """Agent for handling X402 protocol interactions"""
    
    def __init__(self, browser_use_api_key: str, helio_public_key: str, helio_secret_key: str, 
                 solana_wallet_address: str = None):
        self.browser_client = BrowserUseClient(browser_use_api_key)
        self.helio_client = HelioPayClient(helio_public_key, helio_secret_key)
        self.solana_wallet_address = solana_wallet_address
        self.payment_cache = {}  # Cache payment confirmations
    
    def request_resource(self, url: str, headers: Dict[str, str] = None) -> Resource:
        """Request a resource that might require payment via X402"""
        if headers is None:
            headers = {}
        
        print(f"Requesting resource at {url}")
        response = requests.get(url, headers=headers)
        
        # Create a base resource object
        resource = Resource(
            url=url,
            content_type=response.headers.get('Content-Type', 'text/plain'),
            data=response.text if response.status_code != 402 else None,
            payment_required=False
        )
        
        # Check for 402 Payment Required
        if response.status_code == 402:
            print(f"Resource at {url} requires payment (HTTP 402)")
            
            # Extract payment details from response headers
            payment_header = response.headers.get('X-Payment-Details')
            if not payment_header:
                raise ValueError("Received 402 status but no payment details in headers")
            
            try:
                payment_info = json.loads(payment_header)
                payment_details = PaymentDetails(**payment_info)
                resource.payment_required = True
                resource.payment_details = payment_details
                
                print(f"Payment details: {payment_details.amount} {payment_details.currency} "
                      f"to {payment_details.recipient}")
            except Exception as e:
                raise ValueError(f"Failed to parse payment details: {e}")
        
        return resource
    
    def process_payment(self, payment_details: PaymentDetails) -> PaymentResult:
        """Process a payment according to X402 protocol"""
        print(f"Processing payment: {payment_details.amount} {payment_details.currency}")
        
        try:
            # Create charge via Helio API
            charge_result = self.helio_client.create_charge(
                payment_request_id=payment_details.payment_request_id,
                request_amount=payment_details.amount,
                customer_details={
                    "additionalJSON": json.dumps({
                        "resource_id": payment_details.resource_id,
                        "x402_request": True,
                        "timestamp": time.time()
                    })
                }
            )
            
            charge_id = charge_result.get('id')
            charge_url = charge_result.get('pageUrl')
            
            print(f"Created charge: {charge_id}")
            print(f"Charge URL: {charge_url}")
            
            # If browser interaction is required, use Browser-Use
            if payment_details.requires_browser_interaction:
                payment_url = charge_url
                
                print(f"Payment requires browser interaction, navigating to {payment_url}")
                
                # Create a browser automation task to complete the payment
                task_id = self.browser_client.create_payment_automation(
                    payment_url=payment_url,
                    wallet_address=self.solana_wallet_address
                )
                
                print(f"Created Browser-Use task: {task_id}")
                
                # Wait for the payment process to complete
                task_result = self.browser_client.wait_for_completion(task_id)
                
                if task_result.get('status') == 'finished':
                    output = task_result.get('output', '{}')
                    try:
                        payment_output = json.loads(output)
                        if payment_output.get('status') == 'completed':
                            print("Browser automation task completed successfully")
                            return PaymentResult(
                                status="success",
                                transaction_id=charge_id,
                                details=payment_output
                            )
                        else:
                            print(f"Payment not completed: {payment_output.get('error_message')}")
                            return PaymentResult(
                                status="failed",
                                error=payment_output.get('error_message', 'Unknown error'),
                                details=payment_output
                            )
                    except json.JSONDecodeError:
                        # Handle unstructured output
                        if "success" in output.lower() or "confirmed" in output.lower():
                            return PaymentResult(
                                status="success",
                                transaction_id=charge_id,
                                details={"raw_output": output}
                            )
                        else:
                            return PaymentResult(
                                status="failed",
                                error="Could not confirm payment completion",
                                details={"raw_output": output}
                            )
                else:
                    print(f"Browser automation failed: {task_result}")
                    return PaymentResult(
                        status="failed",
                        error="Browser automation failed",
                        details=task_result
                    )
            
            # For non-browser payments, assume success (in a real implementation, you'd check the status)
            return PaymentResult(
                status="success",
                transaction_id=charge_id
            )
            
        except Exception as e:
            print(f"Payment failed: {e}")
            return PaymentResult(
                status="failed",
                error=str(e)
            )
    
    def access_resource_with_payment(self, url: str) -> Resource:
        """Access a resource, handling payment if required"""
        # First, try to access without payment
        resource = self.request_resource(url)
        
        # If payment is required, process it
        if resource.payment_required and resource.payment_details:
            print("Resource requires payment. Initiating payment flow...")
            
            payment_result = self.process_payment(resource.payment_details)
            
            if payment_result.status == "success":
                print(f"Payment successful. Transaction ID: {payment_result.transaction_id}")
                
                # Cache the successful payment
                self.payment_cache[url] = payment_result.transaction_id
                
                # Retry the request with payment confirmation header
                headers = {
                    'X-Payment-Confirmation': payment_result.transaction_id,
                    'X-Transaction-ID': payment_result.transaction_id
                }
                
                # Request the resource again, now with payment confirmation
                return self.request_resource(url, headers)
            else:
                print(f"Payment failed: {payment_result.error}")
                # Return the original resource with payment_required still set to True
                return resource
        
        return resource
    
    def batch_process_resources(self, urls: list[str]) -> Dict[str, Resource]:
        """Process multiple resources in batch, optimizing for payment efficiency"""
        results = {}
        
        for url in urls:
            # Check if we already paid for this domain
            domain = url.split('//')[-1].split('/')[0]
            
            # Try to use cached payment tokens for the same domain
            existing_payment = None
            for cached_url, payment_id in self.payment_cache.items():
                if domain in cached_url:
                    existing_payment = payment_id
                    break
            
            if existing_payment:
                # Try using the existing payment token
                headers = {'X-Payment-Confirmation': existing_payment}
                resource = self.request_resource(url, headers)
                
                # If it worked, add to results
                if not resource.payment_required:
                    results[url] = resource
                    continue
            
            # Otherwise process normally
            results[url] = self.access_resource_with_payment(url)
        
        return results

Step 5: Connect Everything in a Complete Example

Here's a complete example that ties everything together:

import os
import json
import argparse
from x402_agent import X402Agent

def main():
    # Parse command line arguments
    parser = argparse.ArgumentParser(description='X402 Agent Demo')
    parser.add_argument('--url', type=str, help='URL of the resource to access')
    parser.add_argument('--batch', action='store_true', help='Process URLs in batch mode')
    parser.add_argument('--urls-file', type=str, help='File containing URLs to process in batch')
    args = parser.parse_args()
    
    # Load configuration
    browser_use_api_key = os.getenv('BROWSER_USE_API_KEY')
    helio_public_key = os.getenv('HELIO_PUBLIC_KEY')
    helio_secret_key = os.getenv('HELIO_SECRET_KEY')
    solana_wallet = os.getenv('SOLANA_WALLET_ADDRESS')
    
    # Initialize the X402 agent
    agent = X402Agent(
        browser_use_api_key=browser_use_api_key,
        helio_public_key=helio_public_key,
        helio_secret_key=helio_secret_key,
        solana_wallet_address=solana_wallet
    )
    
    if args.batch and args.urls_file:
        # Batch process mode
        with open(args.urls_file, 'r') as f:
            urls = [line.strip() for line in f.readlines() if line.strip()]
        
        print(f"Processing {len(urls)} URLs in batch mode...")
        results = agent.batch_process_resources(urls)
        
        # Output results summary
        print("\nResults Summary:")
        for url, resource in results.items():
            if resource.payment_required:
                print(f"❌ {url}: Payment required but not completed")
            else:
                print(f"✅ {url}: Successfully accessed ({len(str(resource.data))} chars)")
                
                # Save content to file
                filename = url.split('/')[-1] or 'resource'
                with open(f"{filename}.data", 'w') as f:
                    f.write(str(resource.data))
    
    elif args.url:
        # Single URL mode
        try:
            resource = agent.access_resource_with_payment(args.url)
            
            if not resource.payment_required:
                print(f"Successfully accessed resource: {args.url}")
                print(f"Content Type: {resource.content_type}")
                print(f"Data length: {len(str(resource.data))} characters")
                
                # Ask if user wants to save the content
                save = input("Save content to file? (y/n): ").lower() == 'y'
                if save:
                    filename = input("Enter filename (default: resource.data): ") or "resource.data"
                    with open(filename, 'w') as f:
                        f.write(str(resource.data))
                    print(f"Content saved to {filename}")
            else:
                print(f"Failed to access resource: Payment required but not completed")
        except Exception as e:
            print(f"Error accessing resource: {e}")
    else:
        print("Please provide a URL with --url or use --batch with --urls-file")

if __name__ == "__main__":
    main()

Advanced Use Cases

AI Research Assistant

An AI agent that autonomously searches and pays for premium academic content:

class ResearchAssistant:
    def __init__(self, x402_agent):
        self.agent = x402_agent
        self.research_cache = {}
    
    async def research_topic(self, topic, max_budget=5.0):
        """Research a specialized topic with a maximum budget in USDC"""
        print(f"Researching topic: {topic} (budget: ${max_budget} USDC)")
        
        # Step 1: Identify relevant premium databases
        databases = [
            "https://premium-papers.example.com/api/search",
            "https://academic-data.example.org/query",
            "https://research-central.example.net/papers"
        ]
        
        search_results = []
        total_spent = 0.0
        
        # Step 2: Query each database with topic
        for db_url in databases:
            if total_spent >= max_budget:
                print(f"Budget limit reached (${total_spent}/${max_budget})")
                break
                
            query_url = f"{db_url}?q={topic}"
            print(f"Querying {db_url}...")
            
            # Step 3: Make HTTP request and handle 402 responses
            resource = self.agent.access_resource_with_payment(query_url)
            
            if not resource.payment_required:
                print(f"Successfully accessed {db_url}")
                try:
                    data = json.loads(resource.data)
                    papers = data.get('papers', [])
                    search_results.extend(papers)
                    
                    # Track spending if payment was made
                    if query_url in self.agent.payment_cache:
                        # Assuming 0.01 USDC per query
                        total_spent += 0.01
                except:
                    print(f"Error parsing results from {db_url}")
            else:
                print(f"Failed to access {db_url} - payment required but not completed")
        
        # Step 4: For each promising paper, get full text if needed
        full_papers = []
        for paper in search_results[:5]:  # Limit to top 5 papers
            if total_spent >= max_budget:
                print(f"Budget limit reached (${total_spent}/${max_budget})")
                break
                
            paper_url = paper.get('full_text_url')
            if paper_url:
                print(f"Accessing full paper: {paper.get('title')}")
                
                # Step 5: Access the paper, handling payment if required
                paper_resource = self.agent.access_resource_with_payment(paper_url)
                
                if not paper_resource.payment_required:
                    print(f"Successfully accessed paper: {paper.get('title')}")
                    full_papers.append({
                        'title': paper.get('title'),
                        'authors': paper.get('authors'),
                        'abstract': paper.get('abstract'),
                        'full_text': paper_resource.data
                    })
                    
                    # Track spending
                    if paper_url in self.agent.payment_cache:
                        # Assuming papers cost 0.05 USDC
                        total_spent += 0.05
                else:
                    print(f"Failed to access paper: {paper.get('title')}")
        
        # Step 6: Compile final research results
        research_report = {
            'topic': topic,
            'papers_found': len(search_results),
            'papers_accessed': len(full_papers),
            'full_papers': full_papers,
            'total_spent': total_spent
        }
        
        # Cache results for future use
        self.research_cache[topic] = research_report
        
        return research_report

Automated E-commerce with Inventory Management

Implement inventory management that can autonomously reorder supplies:

class AutomatedInventoryManager:
    def __init__(self, x402_agent, inventory_api_url, reorder_threshold=10):
        self.agent = x402_agent
        self.inventory_api = inventory_api_url
        self.threshold = reorder_threshold
        self.supplier_catalog = {}
        self.order_history = []
    
    async def check_inventory_levels(self):
        """Monitor inventory and reorder when needed"""
        
        # Step 1: Get current inventory levels
        inventory_url = f"{self.inventory_api}/current"
        inventory = requests.get(inventory_url).json()
        
        low_stock_items = []
        for item, details in inventory.items():
            if details['quantity'] <= self.threshold:
                low_stock_items.append({
                    'sku': item,
                    'name': details['name'],
                    'current_quantity': details['quantity'],
                    'reorder_quantity': details['reorder_amount']
                })
        
        if not low_stock_items:
            print("All inventory levels are sufficient")
            return
        
        print(f"Found {len(low_stock_items)} items below threshold")
        
        # Step 2: For each low stock item, find suppliers
        for item in low_stock_items:
            print(f"Finding suppliers for {item['name']} (SKU: {item['sku']})")
            
            # Use cached supplier info or find new suppliers
            if item['sku'] in self.supplier_catalog:
                suppliers = self.supplier_catalog[item['sku']]
            else:
                # Step 3: Query supplier marketplace
                supplier_url = f"https://supplier-marketplace.example.com/api/find?sku={item['sku']}"
                
                # Use X402 agent to access the marketplace (might require payment)
                resource = self.agent.access_resource_with_payment(supplier_url)
                
                if not resource.payment_required:
                    suppliers = json.loads(resource.data)
                    # Cache supplier information
                    self.supplier_catalog[item['sku']] = suppliers
                else:
                    print(f"Could not access supplier information for {item['name']}")
                    continue
            
            if not suppliers:
                print(f"No suppliers found for {item['name']}")
                continue
            
            # Select best supplier based on price, availability, rating
            best_supplier = min(suppliers, key=lambda s: float(s['price']))
            
            print(f"Selected supplier: {best_supplier['name']} for {item['name']}")
            
            # Step 4: Place order using X402 protocol
            order_url = f"https://supplier-api.example.com/order"
            order_payload = {
                'supplier_id': best_supplier['id'],
                'item_sku': item['sku'],
                'quantity': item['reorder_quantity'],
                'shipping_address': "123 Warehouse Ave, Storage City, SC 12345"
            }
            
            # Prepare the order (this might return a 402 status requiring payment)
            headers = {'Content-Type': 'application/json'}
            order_response = requests.post(order_url, json=order_payload, headers=headers)
            
            if order_response.status_code == 402:
                # Handle payment required response
                payment_details = json.loads(order_response.headers.get('X-Payment-Details'))
                
                # Process payment
                payment_result = self.agent.process_payment(payment_details)
                
                if payment_result.status == "success":
                    # Confirm order with payment token
                    order_headers = {
                        'Content-Type': 'application/json',
                        'X-Payment-Confirmation': payment_result.transaction_id
                    }
                    
                    final_response = requests.post(order_url, json=order_payload, headers=order_headers)
                    
                    if final_response.status_code == 200:
                        order_confirmation = final_response.json()
                        
                        # Log successful order
                        order_record = {
                            'timestamp': time.time(),
                            'item': item,
                            'supplier': best_supplier,
                            'quantity': item['reorder_quantity'],
                            'total_cost': float(best_supplier['price']) * item['reorder_quantity'],
                            'order_id': order_confirmation['order_id'],
                            'estimated_delivery': order_confirmation['estimated_delivery']
                        }
                        
                        self.order_history.append(order_record)
                        
                        print(f"Successfully ordered {item['reorder_quantity']} units of {item['name']}")
                        print(f"Order ID: {order_confirmation['order_id']}")
                        print(f"Estimated delivery: {order_confirmation['estimated_delivery']}")
                        
                        # Update inventory system
                        update_url = f"{self.inventory_api}/update-on-order"
                        update_payload = {
                            'sku': item['sku'],
                            'on_order': item['reorder_quantity'],
                            'order_id': order_confirmation['order_id']
                        }
                        
                        requests.post(update_url, json=update_payload)
                    else:
                        print(f"Order failed even after payment: {final_response.text}")
                else:
                    print(f"Payment failed: {payment_result.error}")
            elif order_response.status_code == 200:
                # Order accepted without payment (rare, but possible)
                order_confirmation = order_response.json()
                
                # Log successful order
                order_record = {
                    'timestamp': time.time(),
                    'item': item,
                    'supplier': best_supplier,
                    'quantity': item['reorder_quantity'],
                    'total_cost': float(best_supplier['price']) * item['reorder_quantity'],
                    'order_id': order_confirmation['order_id'],
                    'estimated_delivery': order_confirmation['estimated_delivery']
                }
                
                self.order_history.append(order_record)
                
                print(f"Successfully ordered {item['reorder_quantity']} units of {item['name']}")
                print(f"Order ID: {order_confirmation['order_id']}")
                print(f"Estimated delivery: {order_confirmation['estimated_delivery']}")
                
                # Update inventory system
                update_url = f"{self.inventory_api}/update-on-order"
                update_payload = {
                    'sku': item['sku'],
                    'on_order': item['reorder_quantity'],
                    'order_id': order_confirmation['order_id']
                }
                
                requests.post(update_url, json=update_payload)
            else:
                print(f"Order failed: {order_response.text}")

Real-time Data Service and API Provider

Create a service that offers access to real-time financial data using the X402 protocol:

// X402 Financial Data API Server
const express = require('express');
const { HelioPayAPI } = require('./helio-pay-client');
const { SolanaBlockchain } = require('./solana-client');
const { RealTimeDataProvider } = require('./data-provider');

const app = express();
const PORT = process.env.PORT || 3000;

// Initialize our services
const helioPay = new HelioPayAPI({
  publicKey: process.env.HELIO_PUBLIC_KEY,
  secretKey: process.env.HELIO_SECRET_KEY
});

const solana = new SolanaBlockchain({
  rpcUrl: process.env.SOLANA_RPC_URL,
  walletPrivateKey: process.env.WALLET_PRIVATE_KEY
});

const dataProvider = new RealTimeDataProvider();

// Track payments and data usage
const paymentRecords = {};

// Middleware to parse JSON
app.use(express.json());

// Root endpoint with service information
app.get('/', (req, res) => {
  res.json({
    service: 'X402 Financial Data API',
    endpoints: [
      { path: '/api/market/price/:symbol', description: 'Get real-time price for a ticker symbol' },
      { path: '/api/market/ohlc/:symbol', description: 'Get OHLC data for a ticker symbol' },
      { path: '/api/market/depth/:symbol', description: 'Get order book depth for a symbol' },
    ],
    pricing: {
      price_endpoint: '0.001 USDC per call',
      ohlc_endpoint: '0.005 USDC per call',
      depth_endpoint: '0.01 USDC per call'
    },
    payment_method: 'X402 Protocol via Helio Pay on Solana'
  });
});

// Real-time price endpoint with X402 payment requirement
app.get('/api/market/price/:symbol', async (req, res) => {
  const symbol = req.params.symbol.toUpperCase();
  
  // Check for payment confirmation header
  const paymentToken = req.headers['x-payment-confirmation'];
  
  if (paymentToken && verifyPayment(paymentToken, 'price', symbol)) {
    try {
      // Get real-time price data
      const priceData = await dataProvider.getRealTimePrice(symbol);
      
      // Record usage
      recordUsage(paymentToken, 'price', symbol);
      
      // Return the data
      return res.json({
        symbol,
        price: priceData.price,
        currency: priceData.currency,
        timestamp: priceData.timestamp,
        provider: priceData.provider
      });
    } catch (error) {
      return res.status(500).json({ error: 'Failed to fetch price data' });
    }
  }
  
  // If no valid payment token, return 402 Payment Required
  const paymentDetails = {
    payment_request_id: process.env.PRICE_ENDPOINT_PAYLINK_ID,
    amount: '0.001',  // 0.001 USDC
    currency: 'USDC',
    recipient: process.env.MERCHANT_WALLET,
    description: `Real-time price data for ${symbol}`,
    requires_browser_interaction: true,
    resource_path: `/api/market/price/${symbol}`
  };
  
  // Return 402 status with payment details
  res.status(402)
     .header('X-Payment-Details', JSON.stringify(paymentDetails))
     .json({ 
       message: 'Payment required to access this data',
       payment_details: paymentDetails
     });
});

// OHLC data endpoint with X402 payment requirement
app.get('/api/market/ohlc/:symbol', async (req, res) => {
  const symbol = req.params.symbol.toUpperCase();
  const interval = req.query.interval || '1h'; // Default to 1-hour intervals
  
  // Check for payment confirmation header
  const paymentToken = req.headers['x-payment-confirmation'];
  
  if (paymentToken && verifyPayment(paymentToken, 'ohlc', symbol)) {
    try {
      // Get OHLC data
      const ohlcData = await dataProvider.getOHLCData(symbol, interval);
      
      // Record usage
      recordUsage(paymentToken, 'ohlc', symbol);
      
      // Return the data
      return res.json({
        symbol,
        interval,
        data: ohlcData,
        timestamp: Date.now(),
        count: ohlcData.length
      });
    } catch (error) {
      return res.status(500).json({ error: 'Failed to fetch OHLC data' });
    }
  }
  
  // If no valid payment token, return 402 Payment Required
  const paymentDetails = {
    payment_request_id: process.env.OHLC_ENDPOINT_PAYLINK_ID,
    amount: '0.005',  // 0.005 USDC
    currency: 'USDC',
    recipient: process.env.MERCHANT_WALLET,
    description: `OHLC data for ${symbol} (${interval})`,
    requires_browser_interaction: true,
    resource_path: `/api/market/ohlc/${symbol}?interval=${interval}`
  };
  
  // Return 402 status with payment details
  res.status(402)
     .header('X-Payment-Details', JSON.stringify(paymentDetails))
     .json({ 
       message: 'Payment required to access this data',
       payment_details: paymentDetails
     });
});

// Helper functions for payment verification and usage tracking
function verifyPayment(token, endpoint, symbol) {
  if (!paymentRecords[token]) {
    return false;
  }
  
  const record = paymentRecords[token];
  
  // Check if payment is for the right endpoint
  if (record.endpoint !== endpoint) {
    return false;
  }
  
  // Check if payment is still valid (15 minute window)
  const now = Date.now();
  if (now - record.timestamp > 15 * 60 * 1000) {
    return false;
  }
  
  return true;
}

function recordUsage(token, endpoint, symbol) {
  if (paymentRecords[token]) {
    paymentRecords[token].usage.push({
      timestamp: Date.now(),
      endpoint,
      symbol
    });
  }
}

// Webhook endpoint for Helio Pay notifications
app.post('/webhook/helio-payment', (req, res) => {
  const { event, transactionObject } = req.body;
  
  if (event === 'CREATED' && transactionObject && 
      transactionObject.meta.transactionStatus === 'SUCCESS') {
    
    try {
      // Extract custom data from additionalJSON if available
      let customData = {};
      if (transactionObject.meta.customerDetails && 
          transactionObject.meta.customerDetails.additionalJSON) {
        customData = JSON.parse(transactionObject.meta.customerDetails.additionalJSON);
      }
      
      // Determine which endpoint this payment is for
      let endpoint = 'unknown';
      if (transactionObject.paylinkId === process.env.PRICE_ENDPOINT_PAYLINK_ID) {
        endpoint = 'price';
      } else if (transactionObject.paylinkId === process.env.OHLC_ENDPOINT_PAYLINK_ID) {
        endpoint = 'ohlc';
      } else if (transactionObject.paylinkId === process.env.DEPTH_ENDPOINT_PAYLINK_ID) {
        endpoint = 'depth';
      }
      
      // Record the payment
      paymentRecords[transactionObject.id] = {
        id: transactionObject.

Last updated