X402 Helio Pay x Browser Use
This documentation outlines how to integrate the X402 protocol with Helio Pay (formerly Solana Pay) and Browser-Use API to create autonomous AI-driven payment solutions.
Contents
Overview
This integration enables AI agents to autonomously engage with e-commerce platforms, make payments, and interact with web interfaces while leveraging the speed and efficiency of the Solana blockchain.
X402 Protocol
What is X402?
X402 is an AI-agentic payment protocol built on HTTP 402, designed to be the new standard for internet-native payments. Developed initially by Coinbase, it enables instant stablecoin payments directly through HTTP, creating a frictionless payment infrastructure for AI agents, apps, and APIs.
Key Features
AI-Agentic Focus: Designed for AI agents to autonomously pay for digital services
HTTP-Native: Works with standard HTTP infrastructure without specialized setup
Stablecoin-Based: Uses stablecoins for payments, ensuring price stability
Frictionless Transactions: Eliminates account creation, API keys, and KYC requirements
Pay-Per-Use Model: Enables paying only for resources actually consumed
How X402 Works
An AI agent requests access to a resource via HTTP
The server responds with a 402 "Payment Required" status and price details
The agent makes payment using stablecoins via HTTP headers
A payment facilitator verifies and settles the payment
The server grants access to the requested resource
Helio Pay Integration
Helio Pay (the evolution of Solana Pay) provides a seamless way to integrate cryptocurrency payments with e-commerce platforms like Shopify. This section covers the key features of Helio Pay that enable X402 protocol implementation.
Setting Up Webhooks for Shopify
To receive notifications about payments made via Helio Pay on your Shopify store:
Prerequisites
Log in to the Shopify Merchant Dashboard
Retrieve your API Keys
Obtain your Shopify Pay Link ID (found at the end of the URL of your Shopify Pay Link)
Webhook Payload Example
{
"event": "CREATED",
"transactionObject": {
"id": "67d0163484e9de99088b8c05",
"paylinkId": "66b3ae45d561f776b94873f0",
"fee": "200",
"quantity": 1,
"createdAt": "2025-03-11T10:53:40.016Z",
"paymentType": "PAYLINK",
"meta": {
"id": "67d0163484e9de99088b8c03",
"amount": "19800",
"senderPK": "89ZFMYrbVFA8vwuw1wzPHQBHYCJsPuMpE9izfcKeb5Qp",
"recipientPK": "Bd1EQQasAcpudCWYXZVL2tSjktJkCJVqPU2Nd6VNrEEV",
"transactionType": "PAYLINK",
"customerDetails": {},
"productDetails": null,
"additionalProductDetails": [],
"transactionSignature": "3RXXiJxRjVE3ypdEzWvcNW2ASb3gc1C4Vc4U2KJJwcrzPjKtM5yep1Gmz4vDhutf229m3ApZtb9sgrMhLYMYDQYr",
"transactionStatus": "SUCCESS",
"splitRevenue": false,
"remainingAccounts": [],
"totalAmount": "19800",
"affiliateAmount": "0",
"tokenQuote": {
"from": "USDC",
"fromAmountDecimal": "0.02",
"to": "USDC",
"toAmountMinimal": "20000"
},
"shopifyPaymentDetails": {
"shopifyPaymentGid": "gid://shopify/PaymentSession/rVVb9k5vukGQollX60oEUQi0L",
"shopifyPaymentId": "rVVb9k5vukGQollX60oEUQi0L"
},
"submitGeolocation": "GB",
"currency": {
"id": "6340313846e4f91b8abc519b",
"blockchain": null
}
}
}
}
Creating Charges via API
To programmatically create charges for Pay Links (crucial for X402 protocol implementation):
Prerequisites
Create a Helio account and generate API keys
Create a Pay Link
API Endpoint and Authentication
Endpoint:
https://api.hel.io/v1/charge/api-key
Method:
POST
Required Query Parameters:
?apiKey={{Your public key here}}
Required Headers:
Authorization: Bearer {{Your API Token here}}
Example Request
{
"paymentRequestId": "667974dbb38d45b726751902",
"requestAmount": "0.2"
}
Example with Additional Data
You can include additional information via the additionalJSON
field:
{
"paymentRequestId": "66c49e24701f6930ee9c77dd",
"requestAmount": "0.01",
"prepareRequestBody": {
"customerDetails": {
"additionalJSON": "{\"x402_request_id\":\"abc123\",\"resource_url\":\"https://api.example.com/premium/data\"}"
}
}
}
Example Response
{
"id": "667c375be240df9843f7f4f9",
"pageUrl": "https://app.hel.io//charge/87e3482c-669b-4c62-ab01-23239889acb4"
}
Example cURL Request
curl --location 'https://api.hel.io/v1/charge/api-key?apiKey=<YOUR_PUBLIC_API_KEY>' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer YOUR_SECRET_API_KEY' \
--data '{
"paymentRequestId": "667974dbb38d45b726751902",
"requestAmount": "0.2"
}'
Embedded Widget Integration
You can embed a Helio widget on any website to create checkout flows, ideal for X402 protocol user interfaces:
{
"paylinkId": "657729a959531071881ed7c5", // Use your own paylink ID!
"display": "new-tab",
"theme": {
"themeMode": "light"
}
}
For inline charges without redirecting, useful for AI-agent-driven payment flows:
{
"chargeToken": "54cdf9b3-5793-4046-9ee8-bb62574bc4ae",
"theme": { "themeMode": "light" },
"primaryColor": "#FF0000",
"neutralColor": "#2D6FE1",
"display": "inline",
}
Subscription Management
X402 can be especially powerful for managing subscriptions to premium services. Helio Pay provides comprehensive subscription management capabilities:
Creating Subscription Pay Links
{
"template": "SUBSCRIPTION",
"name": "Premium API Access",
"price": "1000000", // 1 USDC
"pricingCurrency": "6340313846e4f91b8abc519b", // USDC currency ID
"features": {
"isSubscription": true
},
"recipients": [
{
"walletId": "YOUR_WALLET_ID",
"currencyId": "6340313846e4f91b8abc519b"
}
],
"subscriptionDetails": {
"renewalReminders": 3,
"gracePeriod": 2,
"annualDiscountBps": 1000, // 10% discount for annual subscriptions
"interval": "MONTH"
}
}
Querying Subscriptions
You can retrieve all subscriptions or a specific subscription to check status:
# Get all subscriptions
curl --location 'https://api.hel.io/v1/subscriptions?apiKey=YOUR_PUBLIC_API_KEY' \
--header 'Authorization: Bearer YOUR_SECRET_API_KEY'
# Get a specific subscription
curl --location 'https://api.hel.io/v1/subscriptions/67a0d2ea18c700e1cda9f1b8?apiKey=YOUR_PUBLIC_API_KEY' \
--header 'Authorization: Bearer YOUR_SECRET_API_KEY'
This allows X402 agents to verify subscription status before granting access to protected resources.
Transaction Tracking
For X402 implementations, tracking and verifying transactions is crucial. Helio Pay offers comprehensive transaction tracking:
Get All Transactions
curl --location 'https://api.hel.io/v1/export/payments?apiKey=YOUR_PUBLIC_API_KEY' \
--header 'Authorization: Bearer YOUR_SECRET_API_KEY'
Get Transactions for a Specific Pay Link
curl --location 'https://api.hel.io/v1/paylink/PAYLINK_ID/transactions?apiKey=YOUR_PUBLIC_API_KEY&from=2024-01-16T15:30:00.000Z&to=2025-01-16T15:30:00.000Z' \
--header 'Authorization: Bearer YOUR_SECRET_API_KEY'
This lets X402 implementations verify payment status and maintain an audit trail of resource access.
Merchant Account Creation
For platforms implementing X402 protocol for their customers, programmatic merchant account creation is essential:
Create a New Merchant Account
{
"id": "c81aede4-c247-431f-84b8-16629c983abe",
"company": {
"name": "API Provider Inc.",
"email": "api@provider.com",
"phoneNumber": "+1234567890",
"discordUsername": "api-provider",
"websiteUrl": "https://api.provider.com",
"kycVerified": true
},
"user": {
"name": "API Admin",
"email": "admin@api.provider.com"
},
"wallets": [
{
"name": "API Revenue Wallet",
"address": "DsUFWNpve7hVQdhWDAfUTGq78SpZZ3tVLdnJiRa54QBU"
}
]
}
This feature requires platform-level API access. Contact hi@hel.io for partnership details.
Redirect with Query Parameters
You can configure redirects after successful payments, useful for completing the X402 payment flow:
{
"template": "OTHER",
"name": "API Access Payment",
"price": "10000",
"pricingCurrency": "63430c8348c610068bcdc482",
"features": {
"shouldRedirectOnSuccess": true,
"hasRedirectQueryParams": true
},
"recipients": [
{
"walletId": "YOUR_WALLET_ID",
"currencyId": "63430c8348c610068bcdc482"
}
],
"redirectUrl": "https://api.example.com/access/verify",
"redirectQueryParams": [{"queryParamType": "WALLET_ADDRESS"}]
}
After payment, users will be redirected to: https://api.example.com/access/verify?WALLET_ADDRESS=7YancRyNQyp9s6G7YNwx9H93UqswoKWqF9GuNJPufyvW
This completes the X402 payment flow by returning the user to the resource with verification parameters.
## Browser-Use API Integration
The Browser-Use API enables AI agents to automate web interaction, making it perfect for implementing X402-powered payment flows.
### Basic Implementation
Use the following Python code to create and monitor browser automation tasks:
```python
import json
import time
import requests
API_KEY = 'your_api_key_here'
BASE_URL = 'https://api.browser-use.com/api/v1'
HEADERS = {'Authorization': f'Bearer {API_KEY}'}
def create_task(instructions: str):
"""Create a new browser automation task"""
response = requests.post(f'{BASE_URL}/run-task', headers=HEADERS,
json={'task': instructions})
return response.json()['id']
def get_task_details(task_id: str):
"""Get full task details including output"""
response = requests.get(f'{BASE_URL}/task/{task_id}', headers=HEADERS)
return response.json()
def wait_for_completion(task_id: str, poll_interval: int = 2):
"""Poll task status until completion"""
count = 0
unique_steps = []
while True:
details = get_task_details(task_id)
new_steps = details['steps']
# use only the new steps that are not in unique_steps.
if new_steps != unique_steps:
for step in new_steps:
if step not in unique_steps:
print(json.dumps(step, indent=4))
unique_steps = new_steps
count += 1
status = details['status']
if status in ['finished', 'failed', 'stopped']:
return details
time.sleep(poll_interval)
def main():
task_id = create_task('Open https://www.google.com and search for openai')
print(f'Task created with ID: {task_id}')
task_details = wait_for_completion(task_id)
print(f"Final output: {task_details['output']}")
if __name__ == '__main__':
main()
Task Control Example
def control_task():
# Create a new task
task_id = create_task("Go to google.com and search for Browser Use")
# Wait for 5 seconds
time.sleep(5)
# Pause the task
requests.put(f"{BASE_URL}/pause-task?task_id={task_id}", headers=HEADERS)
print("Task paused! Check the live preview.")
# Wait for user input
input("Press Enter to resume...")
# Resume the task
requests.put(f"{BASE_URL}/resume-task?task_id={task_id}", headers=HEADERS)
# Wait for completion
result = wait_for_completion(task_id)
print(f"Task completed with output: {result['output']}")
Structured Output Example
import json
import os
import time
import requests
from pydantic import BaseModel
from typing import List
API_KEY = os.getenv("API_KEY")
BASE_URL = 'https://api.browser-use.com/api/v1'
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
# Define output schema using Pydantic
class SocialMediaCompany(BaseModel):
name: str
market_cap: float
headquarters: str
founded_year: int
class SocialMediaCompanies(BaseModel):
companies: List[SocialMediaCompany]
def create_structured_task(instructions: str, schema: dict):
"""Create a task that expects structured output"""
payload = {
"task": instructions,
"structured_output_json": json.dumps(schema)
}
response = requests.post(f"{BASE_URL}/run-task", headers=HEADERS, json=payload)
response.raise_for_status()
return response.json()["id"]
def wait_for_task_completion(task_id: str, poll_interval: int = 5):
"""Poll task status until it completes"""
while True:
response = requests.get(f"{BASE_URL}/task/{task_id}/status", headers=HEADERS)
response.raise_for_status()
status = response.json()
if status == "finished":
break
elif status in ["failed", "stopped"]:
raise RuntimeError(f"Task {task_id} ended with status: {status}")
print("Waiting for task to finish...")
time.sleep(poll_interval)
def fetch_task_output(task_id: str):
"""Retrieve the final task result"""
response = requests.get(f"{BASE_URL}/task/{task_id}", headers=HEADERS)
response.raise_for_status()
return response.json()["output"]
def main():
schema = SocialMediaCompanies.model_json_schema()
task_id = create_structured_task(
"Get me the top social media companies by market cap",
schema
)
print(f"Task created with ID: {task_id}")
wait_for_task_completion(task_id)
print("Task completed!")
output = fetch_task_output(task_id)
print("Raw output:", output)
try:
parsed = SocialMediaCompanies.model_validate_json(output)
print("Parsed output:")
print(parsed)
except Exception as e:
print(f"Failed to parse structured output: {e}")
Integration Architecture
Combining X402, Helio Pay, and Browser-Use creates a powerful system for autonomous AI-driven payments:
Architecture Components
X402 Protocol Layer: Enables standardized payment requests and responses
Solana/Helio Integration Layer: Handles rapid, low-cost payment processing
Browser-Use Automation Layer: Provides web interaction capabilities
AI Agent Layer: Orchestrates the entire workflow
Flow Diagram
┌────────────┐ ┌───────────┐ ┌──────────────┐ ┌──────────┐
│ │ HTTP │ │ HTTP 402 │ │ Payment │ │
│ AI Agent │ ────────►│ Service │ ────────►│ X402 Handler │ ────────►│ Helio │
│ │ Request │ │ Response │ │ Request │ │
└─────┬──────┘ └───────────┘ └──────────────┘ └────┬─────┘
│ │
│ │
│ │
│ ┌──────────────┐ │
│ │ │ │
│ Browser-Use Payment │ Solana │ Payment │
└────────────────────────────────────────►│ Blockchain │◄─────────────┘
Automation Confirmation │ │
└──────────────┘
Implementation Guide
This section provides a step-by-step guide to implementing the X402 protocol with Helio Pay and Browser-Use API.
Step 1: Set Up X402 Server Handler
First, create a server component that implements the X402 protocol:
// Example Express.js implementation
const express = require('express');
const app = express();
const PORT = process.env.PORT || 3000;
// Middleware to parse JSON
app.use(express.json());
// Your protected resources
const premiumContent = {
'research-paper-001': { title: 'Advanced AI Research Paper', content: 'Premium content here...' },
'dataset-001': { title: 'Large-scale Training Dataset', format: 'CSV', size: '2.3GB' }
};
// Helio Pay configuration
const HELIO_CONFIG = {
publicKey: process.env.HELIO_PUBLIC_KEY,
secretKey: process.env.HELIO_SECRET_KEY,
payLinkId: process.env.HELIO_PAYLINK_ID
};
// Track payments
const paymentsReceived = {};
// X402 route handler
app.get('/api/premium/:resourceId', async (req, res) => {
const resourceId = req.params.resourceId;
const resource = premiumContent[resourceId];
// Resource exists check
if (!resource) {
return res.status(404).json({ error: 'Resource not found' });
}
// Check if payment confirmation is in header
const paymentConfirmation = req.headers['x-payment-confirmation'];
if (paymentConfirmation && paymentsReceived[paymentConfirmation]) {
// Payment verified, serve the resource
return res.json({ resource });
}
// Create payment details
const paymentDetails = {
payment_request_id: HELIO_CONFIG.payLinkId,
amount: '0.01', // $0.01 USDC
currency: 'USDC',
recipient: 'Your Wallet Address',
description: `Access to ${resource.title}`,
requires_browser_interaction: true,
resource_id: resourceId
};
// Return 402 Payment Required with payment details
res.status(402)
.header('X-Payment-Details', JSON.stringify(paymentDetails))
.json({
message: 'Payment required to access this resource',
paymentDetails
});
});
// Payment verification webhook from Helio
app.post('/webhook/payment', (req, res) => {
const { transactionObject } = req.body;
if (transactionObject && transactionObject.meta.transactionStatus === 'SUCCESS') {
// Store payment confirmation
paymentsReceived[transactionObject.id] = {
amount: transactionObject.meta.amount,
timestamp: new Date(),
sender: transactionObject.meta.senderPK
};
console.log(`Payment received: ${transactionObject.id}`);
}
res.status(200).send('OK');
});
app.listen(PORT, () => {
console.log(`X402 server running on port ${PORT}`);
});
Step 2: Integrate Helio Pay
Create Pay Links for Your Resources
First, create the necessary Pay Links that will be used in the X402 flow:
curl --location 'https://api.hel.io/v1/paylink/create/api-key?apiKey=YOUR_PUBLIC_API_KEY' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer YOUR_SECRET_API_KEY' \
--data '{
"template": "OTHER",
"name": "X402 API Access",
"price": "10000",
"pricingCurrency": "6340313846e4f91b8abc519b",
"features": {
"requireEmail": true
},
"recipients": [
{
"walletId": "YOUR_WALLET_ID",
"currencyId": "6340313846e4f91b8abc519b"
}
],
"description": "One-time access to premium API resources"
}'
Set Up Webhooks for Payment Notifications
Configure a webhook in your Helio Pay dashboard to send notifications to your server:
Webhook URL: https://your-x402-server.com/webhook/payment
Events to Listen For: Transaction Created, Transaction Completed
Implement Charge Creation for Dynamic Pricing
For resources with dynamic pricing based on complexity, size, or other factors:
// Dynamic pricing handler
async function createHelioPurchase(resourceId, amount) {
try {
const response = await fetch(`https://api.hel.io/v1/charge/api-key?apiKey=${HELIO_CONFIG.publicKey}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${HELIO_CONFIG.secretKey}`
},
body: JSON.stringify({
paymentRequestId: HELIO_CONFIG.payLinkId,
requestAmount: amount,
prepareRequestBody: {
customerDetails: {
additionalJSON: JSON.stringify({
resource_id: resourceId,
request_timestamp: new Date().toISOString()
})
}
}
})
});
return await response.json();
} catch (error) {
console.error('Error creating charge:', error);
throw error;
}
}
Step 3: Set Up Browser-Use API
Create a comprehensive Browser-Use client for automating payment flows:
import os
import json
import requests
import time
from typing import Dict, Any, Optional
class BrowserUseClient:
"""Client for Browser-Use API to automate web interactions"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.browser-use.com/api/v1"
self.headers = {'Authorization': f'Bearer {api_key}'}
def create_payment_automation(self, payment_url: str, wallet_address: str = None) -> str:
"""Create a task specifically for handling payment flows"""
instructions = (
f"Go to {payment_url} and complete the payment process. "
"Wait for the payment page to load completely. "
"If there is a 'Connect Wallet' button, click it. "
)
if wallet_address:
instructions += f"If asked for a wallet address, use '{wallet_address}'. "
instructions += (
"Complete any CAPTCHA or verification if presented. "
"After the payment is completed successfully, "
"wait for confirmation and extract any transaction ID or confirmation code."
)
# Define structured output to capture payment results
payment_schema = {
"type": "object",
"properties": {
"status": {"type": "string", "enum": ["completed", "failed", "pending"]},
"transaction_id": {"type": "string"},
"error_message": {"type": "string"},
"confirmation_page_url": {"type": "string"}
},
"required": ["status"]
}
return self.create_task(instructions, payment_schema)
def create_task(self, instructions: str, structured_output: Dict[str, Any] = None) -> str:
"""Create a new browser automation task"""
payload = {'task': instructions}
if structured_output:
payload['structured_output_json'] = json.dumps(structured_output)
response = requests.post(
f'{self.base_url}/run-task',
headers=self.headers,
json=payload
)
response.raise_for_status()
return response.json()['id']
def get_task_details(self, task_id: str) -> Dict[str, Any]:
"""Get full task details including output"""
response = requests.get(
f'{self.base_url}/task/{task_id}',
headers=self.headers
)
response.raise_for_status()
return response.json()
def wait_for_completion(self, task_id: str, poll_interval: int = 2,
timeout: int = 300) -> Dict[str, Any]:
"""Wait for a task to complete with timeout"""
start_time = time.time()
unique_steps = []
while time.time() - start_time < timeout:
details = self.get_task_details(task_id)
# Track progress
new_steps = details.get('steps', [])
if new_steps != unique_steps:
for step in new_steps:
if step not in unique_steps:
print(f"Step {step.get('step')}: {step.get('next_goal')}")
unique_steps = new_steps
status = details.get('status')
if status in ['finished', 'failed', 'stopped']:
return details
time.sleep(poll_interval)
# Timeout occurred
self.stop_task(task_id)
raise TimeoutError(f"Task {task_id} did not complete within {timeout} seconds")
def stop_task(self, task_id: str) -> Dict[str, Any]:
"""Stop a running task"""
response = requests.put(
f'{self.base_url}/stop-task?task_id={task_id}',
headers=self.headers
)
response.raise_for_status()
return response.json()
Step 4: Create X402 Agent
Now, implement a comprehensive X402 agent that ties everything together:
import json
import requests
import os
import time
from typing import Dict, Any, Optional, Union
from pydantic import BaseModel
# Import our clients
from browser_use_client import BrowserUseClient
from helio_pay_client import HelioPayClient
# Define models for structured data
class PaymentDetails(BaseModel):
payment_request_id: str
amount: str
currency: str
recipient: str
description: Optional[str] = None
requires_browser_interaction: bool = False
resource_id: Optional[str] = None
class PaymentResult(BaseModel):
status: str # "success", "failed", "pending"
transaction_id: Optional[str] = None
error: Optional[str] = None
details: Optional[Dict[str, Any]] = None
class Resource(BaseModel):
url: str
content_type: str
data: Any
payment_required: bool = False
payment_details: Optional[PaymentDetails] = None
class X402Agent:
"""Agent for handling X402 protocol interactions"""
def __init__(self, browser_use_api_key: str, helio_public_key: str, helio_secret_key: str,
solana_wallet_address: str = None):
self.browser_client = BrowserUseClient(browser_use_api_key)
self.helio_client = HelioPayClient(helio_public_key, helio_secret_key)
self.solana_wallet_address = solana_wallet_address
self.payment_cache = {} # Cache payment confirmations
def request_resource(self, url: str, headers: Dict[str, str] = None) -> Resource:
"""Request a resource that might require payment via X402"""
if headers is None:
headers = {}
print(f"Requesting resource at {url}")
response = requests.get(url, headers=headers)
# Create a base resource object
resource = Resource(
url=url,
content_type=response.headers.get('Content-Type', 'text/plain'),
data=response.text if response.status_code != 402 else None,
payment_required=False
)
# Check for 402 Payment Required
if response.status_code == 402:
print(f"Resource at {url} requires payment (HTTP 402)")
# Extract payment details from response headers
payment_header = response.headers.get('X-Payment-Details')
if not payment_header:
raise ValueError("Received 402 status but no payment details in headers")
try:
payment_info = json.loads(payment_header)
payment_details = PaymentDetails(**payment_info)
resource.payment_required = True
resource.payment_details = payment_details
print(f"Payment details: {payment_details.amount} {payment_details.currency} "
f"to {payment_details.recipient}")
except Exception as e:
raise ValueError(f"Failed to parse payment details: {e}")
return resource
def process_payment(self, payment_details: PaymentDetails) -> PaymentResult:
"""Process a payment according to X402 protocol"""
print(f"Processing payment: {payment_details.amount} {payment_details.currency}")
try:
# Create charge via Helio API
charge_result = self.helio_client.create_charge(
payment_request_id=payment_details.payment_request_id,
request_amount=payment_details.amount,
customer_details={
"additionalJSON": json.dumps({
"resource_id": payment_details.resource_id,
"x402_request": True,
"timestamp": time.time()
})
}
)
charge_id = charge_result.get('id')
charge_url = charge_result.get('pageUrl')
print(f"Created charge: {charge_id}")
print(f"Charge URL: {charge_url}")
# If browser interaction is required, use Browser-Use
if payment_details.requires_browser_interaction:
payment_url = charge_url
print(f"Payment requires browser interaction, navigating to {payment_url}")
# Create a browser automation task to complete the payment
task_id = self.browser_client.create_payment_automation(
payment_url=payment_url,
wallet_address=self.solana_wallet_address
)
print(f"Created Browser-Use task: {task_id}")
# Wait for the payment process to complete
task_result = self.browser_client.wait_for_completion(task_id)
if task_result.get('status') == 'finished':
output = task_result.get('output', '{}')
try:
payment_output = json.loads(output)
if payment_output.get('status') == 'completed':
print("Browser automation task completed successfully")
return PaymentResult(
status="success",
transaction_id=charge_id,
details=payment_output
)
else:
print(f"Payment not completed: {payment_output.get('error_message')}")
return PaymentResult(
status="failed",
error=payment_output.get('error_message', 'Unknown error'),
details=payment_output
)
except json.JSONDecodeError:
# Handle unstructured output
if "success" in output.lower() or "confirmed" in output.lower():
return PaymentResult(
status="success",
transaction_id=charge_id,
details={"raw_output": output}
)
else:
return PaymentResult(
status="failed",
error="Could not confirm payment completion",
details={"raw_output": output}
)
else:
print(f"Browser automation failed: {task_result}")
return PaymentResult(
status="failed",
error="Browser automation failed",
details=task_result
)
# For non-browser payments, assume success (in a real implementation, you'd check the status)
return PaymentResult(
status="success",
transaction_id=charge_id
)
except Exception as e:
print(f"Payment failed: {e}")
return PaymentResult(
status="failed",
error=str(e)
)
def access_resource_with_payment(self, url: str) -> Resource:
"""Access a resource, handling payment if required"""
# First, try to access without payment
resource = self.request_resource(url)
# If payment is required, process it
if resource.payment_required and resource.payment_details:
print("Resource requires payment. Initiating payment flow...")
payment_result = self.process_payment(resource.payment_details)
if payment_result.status == "success":
print(f"Payment successful. Transaction ID: {payment_result.transaction_id}")
# Cache the successful payment
self.payment_cache[url] = payment_result.transaction_id
# Retry the request with payment confirmation header
headers = {
'X-Payment-Confirmation': payment_result.transaction_id,
'X-Transaction-ID': payment_result.transaction_id
}
# Request the resource again, now with payment confirmation
return self.request_resource(url, headers)
else:
print(f"Payment failed: {payment_result.error}")
# Return the original resource with payment_required still set to True
return resource
return resource
def batch_process_resources(self, urls: list[str]) -> Dict[str, Resource]:
"""Process multiple resources in batch, optimizing for payment efficiency"""
results = {}
for url in urls:
# Check if we already paid for this domain
domain = url.split('//')[-1].split('/')[0]
# Try to use cached payment tokens for the same domain
existing_payment = None
for cached_url, payment_id in self.payment_cache.items():
if domain in cached_url:
existing_payment = payment_id
break
if existing_payment:
# Try using the existing payment token
headers = {'X-Payment-Confirmation': existing_payment}
resource = self.request_resource(url, headers)
# If it worked, add to results
if not resource.payment_required:
results[url] = resource
continue
# Otherwise process normally
results[url] = self.access_resource_with_payment(url)
return results
Step 5: Connect Everything in a Complete Example
Here's a complete example that ties everything together:
import os
import json
import argparse
from x402_agent import X402Agent
def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description='X402 Agent Demo')
parser.add_argument('--url', type=str, help='URL of the resource to access')
parser.add_argument('--batch', action='store_true', help='Process URLs in batch mode')
parser.add_argument('--urls-file', type=str, help='File containing URLs to process in batch')
args = parser.parse_args()
# Load configuration
browser_use_api_key = os.getenv('BROWSER_USE_API_KEY')
helio_public_key = os.getenv('HELIO_PUBLIC_KEY')
helio_secret_key = os.getenv('HELIO_SECRET_KEY')
solana_wallet = os.getenv('SOLANA_WALLET_ADDRESS')
# Initialize the X402 agent
agent = X402Agent(
browser_use_api_key=browser_use_api_key,
helio_public_key=helio_public_key,
helio_secret_key=helio_secret_key,
solana_wallet_address=solana_wallet
)
if args.batch and args.urls_file:
# Batch process mode
with open(args.urls_file, 'r') as f:
urls = [line.strip() for line in f.readlines() if line.strip()]
print(f"Processing {len(urls)} URLs in batch mode...")
results = agent.batch_process_resources(urls)
# Output results summary
print("\nResults Summary:")
for url, resource in results.items():
if resource.payment_required:
print(f"❌ {url}: Payment required but not completed")
else:
print(f"✅ {url}: Successfully accessed ({len(str(resource.data))} chars)")
# Save content to file
filename = url.split('/')[-1] or 'resource'
with open(f"{filename}.data", 'w') as f:
f.write(str(resource.data))
elif args.url:
# Single URL mode
try:
resource = agent.access_resource_with_payment(args.url)
if not resource.payment_required:
print(f"Successfully accessed resource: {args.url}")
print(f"Content Type: {resource.content_type}")
print(f"Data length: {len(str(resource.data))} characters")
# Ask if user wants to save the content
save = input("Save content to file? (y/n): ").lower() == 'y'
if save:
filename = input("Enter filename (default: resource.data): ") or "resource.data"
with open(filename, 'w') as f:
f.write(str(resource.data))
print(f"Content saved to {filename}")
else:
print(f"Failed to access resource: Payment required but not completed")
except Exception as e:
print(f"Error accessing resource: {e}")
else:
print("Please provide a URL with --url or use --batch with --urls-file")
if __name__ == "__main__":
main()
Advanced Use Cases
AI Research Assistant
An AI agent that autonomously searches and pays for premium academic content:
class ResearchAssistant:
def __init__(self, x402_agent):
self.agent = x402_agent
self.research_cache = {}
async def research_topic(self, topic, max_budget=5.0):
"""Research a specialized topic with a maximum budget in USDC"""
print(f"Researching topic: {topic} (budget: ${max_budget} USDC)")
# Step 1: Identify relevant premium databases
databases = [
"https://premium-papers.example.com/api/search",
"https://academic-data.example.org/query",
"https://research-central.example.net/papers"
]
search_results = []
total_spent = 0.0
# Step 2: Query each database with topic
for db_url in databases:
if total_spent >= max_budget:
print(f"Budget limit reached (${total_spent}/${max_budget})")
break
query_url = f"{db_url}?q={topic}"
print(f"Querying {db_url}...")
# Step 3: Make HTTP request and handle 402 responses
resource = self.agent.access_resource_with_payment(query_url)
if not resource.payment_required:
print(f"Successfully accessed {db_url}")
try:
data = json.loads(resource.data)
papers = data.get('papers', [])
search_results.extend(papers)
# Track spending if payment was made
if query_url in self.agent.payment_cache:
# Assuming 0.01 USDC per query
total_spent += 0.01
except:
print(f"Error parsing results from {db_url}")
else:
print(f"Failed to access {db_url} - payment required but not completed")
# Step 4: For each promising paper, get full text if needed
full_papers = []
for paper in search_results[:5]: # Limit to top 5 papers
if total_spent >= max_budget:
print(f"Budget limit reached (${total_spent}/${max_budget})")
break
paper_url = paper.get('full_text_url')
if paper_url:
print(f"Accessing full paper: {paper.get('title')}")
# Step 5: Access the paper, handling payment if required
paper_resource = self.agent.access_resource_with_payment(paper_url)
if not paper_resource.payment_required:
print(f"Successfully accessed paper: {paper.get('title')}")
full_papers.append({
'title': paper.get('title'),
'authors': paper.get('authors'),
'abstract': paper.get('abstract'),
'full_text': paper_resource.data
})
# Track spending
if paper_url in self.agent.payment_cache:
# Assuming papers cost 0.05 USDC
total_spent += 0.05
else:
print(f"Failed to access paper: {paper.get('title')}")
# Step 6: Compile final research results
research_report = {
'topic': topic,
'papers_found': len(search_results),
'papers_accessed': len(full_papers),
'full_papers': full_papers,
'total_spent': total_spent
}
# Cache results for future use
self.research_cache[topic] = research_report
return research_report
Automated E-commerce with Inventory Management
Implement inventory management that can autonomously reorder supplies:
class AutomatedInventoryManager:
def __init__(self, x402_agent, inventory_api_url, reorder_threshold=10):
self.agent = x402_agent
self.inventory_api = inventory_api_url
self.threshold = reorder_threshold
self.supplier_catalog = {}
self.order_history = []
async def check_inventory_levels(self):
"""Monitor inventory and reorder when needed"""
# Step 1: Get current inventory levels
inventory_url = f"{self.inventory_api}/current"
inventory = requests.get(inventory_url).json()
low_stock_items = []
for item, details in inventory.items():
if details['quantity'] <= self.threshold:
low_stock_items.append({
'sku': item,
'name': details['name'],
'current_quantity': details['quantity'],
'reorder_quantity': details['reorder_amount']
})
if not low_stock_items:
print("All inventory levels are sufficient")
return
print(f"Found {len(low_stock_items)} items below threshold")
# Step 2: For each low stock item, find suppliers
for item in low_stock_items:
print(f"Finding suppliers for {item['name']} (SKU: {item['sku']})")
# Use cached supplier info or find new suppliers
if item['sku'] in self.supplier_catalog:
suppliers = self.supplier_catalog[item['sku']]
else:
# Step 3: Query supplier marketplace
supplier_url = f"https://supplier-marketplace.example.com/api/find?sku={item['sku']}"
# Use X402 agent to access the marketplace (might require payment)
resource = self.agent.access_resource_with_payment(supplier_url)
if not resource.payment_required:
suppliers = json.loads(resource.data)
# Cache supplier information
self.supplier_catalog[item['sku']] = suppliers
else:
print(f"Could not access supplier information for {item['name']}")
continue
if not suppliers:
print(f"No suppliers found for {item['name']}")
continue
# Select best supplier based on price, availability, rating
best_supplier = min(suppliers, key=lambda s: float(s['price']))
print(f"Selected supplier: {best_supplier['name']} for {item['name']}")
# Step 4: Place order using X402 protocol
order_url = f"https://supplier-api.example.com/order"
order_payload = {
'supplier_id': best_supplier['id'],
'item_sku': item['sku'],
'quantity': item['reorder_quantity'],
'shipping_address': "123 Warehouse Ave, Storage City, SC 12345"
}
# Prepare the order (this might return a 402 status requiring payment)
headers = {'Content-Type': 'application/json'}
order_response = requests.post(order_url, json=order_payload, headers=headers)
if order_response.status_code == 402:
# Handle payment required response
payment_details = json.loads(order_response.headers.get('X-Payment-Details'))
# Process payment
payment_result = self.agent.process_payment(payment_details)
if payment_result.status == "success":
# Confirm order with payment token
order_headers = {
'Content-Type': 'application/json',
'X-Payment-Confirmation': payment_result.transaction_id
}
final_response = requests.post(order_url, json=order_payload, headers=order_headers)
if final_response.status_code == 200:
order_confirmation = final_response.json()
# Log successful order
order_record = {
'timestamp': time.time(),
'item': item,
'supplier': best_supplier,
'quantity': item['reorder_quantity'],
'total_cost': float(best_supplier['price']) * item['reorder_quantity'],
'order_id': order_confirmation['order_id'],
'estimated_delivery': order_confirmation['estimated_delivery']
}
self.order_history.append(order_record)
print(f"Successfully ordered {item['reorder_quantity']} units of {item['name']}")
print(f"Order ID: {order_confirmation['order_id']}")
print(f"Estimated delivery: {order_confirmation['estimated_delivery']}")
# Update inventory system
update_url = f"{self.inventory_api}/update-on-order"
update_payload = {
'sku': item['sku'],
'on_order': item['reorder_quantity'],
'order_id': order_confirmation['order_id']
}
requests.post(update_url, json=update_payload)
else:
print(f"Order failed even after payment: {final_response.text}")
else:
print(f"Payment failed: {payment_result.error}")
elif order_response.status_code == 200:
# Order accepted without payment (rare, but possible)
order_confirmation = order_response.json()
# Log successful order
order_record = {
'timestamp': time.time(),
'item': item,
'supplier': best_supplier,
'quantity': item['reorder_quantity'],
'total_cost': float(best_supplier['price']) * item['reorder_quantity'],
'order_id': order_confirmation['order_id'],
'estimated_delivery': order_confirmation['estimated_delivery']
}
self.order_history.append(order_record)
print(f"Successfully ordered {item['reorder_quantity']} units of {item['name']}")
print(f"Order ID: {order_confirmation['order_id']}")
print(f"Estimated delivery: {order_confirmation['estimated_delivery']}")
# Update inventory system
update_url = f"{self.inventory_api}/update-on-order"
update_payload = {
'sku': item['sku'],
'on_order': item['reorder_quantity'],
'order_id': order_confirmation['order_id']
}
requests.post(update_url, json=update_payload)
else:
print(f"Order failed: {order_response.text}")
Real-time Data Service and API Provider
Create a service that offers access to real-time financial data using the X402 protocol:
// X402 Financial Data API Server
const express = require('express');
const { HelioPayAPI } = require('./helio-pay-client');
const { SolanaBlockchain } = require('./solana-client');
const { RealTimeDataProvider } = require('./data-provider');
const app = express();
const PORT = process.env.PORT || 3000;
// Initialize our services
const helioPay = new HelioPayAPI({
publicKey: process.env.HELIO_PUBLIC_KEY,
secretKey: process.env.HELIO_SECRET_KEY
});
const solana = new SolanaBlockchain({
rpcUrl: process.env.SOLANA_RPC_URL,
walletPrivateKey: process.env.WALLET_PRIVATE_KEY
});
const dataProvider = new RealTimeDataProvider();
// Track payments and data usage
const paymentRecords = {};
// Middleware to parse JSON
app.use(express.json());
// Root endpoint with service information
app.get('/', (req, res) => {
res.json({
service: 'X402 Financial Data API',
endpoints: [
{ path: '/api/market/price/:symbol', description: 'Get real-time price for a ticker symbol' },
{ path: '/api/market/ohlc/:symbol', description: 'Get OHLC data for a ticker symbol' },
{ path: '/api/market/depth/:symbol', description: 'Get order book depth for a symbol' },
],
pricing: {
price_endpoint: '0.001 USDC per call',
ohlc_endpoint: '0.005 USDC per call',
depth_endpoint: '0.01 USDC per call'
},
payment_method: 'X402 Protocol via Helio Pay on Solana'
});
});
// Real-time price endpoint with X402 payment requirement
app.get('/api/market/price/:symbol', async (req, res) => {
const symbol = req.params.symbol.toUpperCase();
// Check for payment confirmation header
const paymentToken = req.headers['x-payment-confirmation'];
if (paymentToken && verifyPayment(paymentToken, 'price', symbol)) {
try {
// Get real-time price data
const priceData = await dataProvider.getRealTimePrice(symbol);
// Record usage
recordUsage(paymentToken, 'price', symbol);
// Return the data
return res.json({
symbol,
price: priceData.price,
currency: priceData.currency,
timestamp: priceData.timestamp,
provider: priceData.provider
});
} catch (error) {
return res.status(500).json({ error: 'Failed to fetch price data' });
}
}
// If no valid payment token, return 402 Payment Required
const paymentDetails = {
payment_request_id: process.env.PRICE_ENDPOINT_PAYLINK_ID,
amount: '0.001', // 0.001 USDC
currency: 'USDC',
recipient: process.env.MERCHANT_WALLET,
description: `Real-time price data for ${symbol}`,
requires_browser_interaction: true,
resource_path: `/api/market/price/${symbol}`
};
// Return 402 status with payment details
res.status(402)
.header('X-Payment-Details', JSON.stringify(paymentDetails))
.json({
message: 'Payment required to access this data',
payment_details: paymentDetails
});
});
// OHLC data endpoint with X402 payment requirement
app.get('/api/market/ohlc/:symbol', async (req, res) => {
const symbol = req.params.symbol.toUpperCase();
const interval = req.query.interval || '1h'; // Default to 1-hour intervals
// Check for payment confirmation header
const paymentToken = req.headers['x-payment-confirmation'];
if (paymentToken && verifyPayment(paymentToken, 'ohlc', symbol)) {
try {
// Get OHLC data
const ohlcData = await dataProvider.getOHLCData(symbol, interval);
// Record usage
recordUsage(paymentToken, 'ohlc', symbol);
// Return the data
return res.json({
symbol,
interval,
data: ohlcData,
timestamp: Date.now(),
count: ohlcData.length
});
} catch (error) {
return res.status(500).json({ error: 'Failed to fetch OHLC data' });
}
}
// If no valid payment token, return 402 Payment Required
const paymentDetails = {
payment_request_id: process.env.OHLC_ENDPOINT_PAYLINK_ID,
amount: '0.005', // 0.005 USDC
currency: 'USDC',
recipient: process.env.MERCHANT_WALLET,
description: `OHLC data for ${symbol} (${interval})`,
requires_browser_interaction: true,
resource_path: `/api/market/ohlc/${symbol}?interval=${interval}`
};
// Return 402 status with payment details
res.status(402)
.header('X-Payment-Details', JSON.stringify(paymentDetails))
.json({
message: 'Payment required to access this data',
payment_details: paymentDetails
});
});
// Helper functions for payment verification and usage tracking
function verifyPayment(token, endpoint, symbol) {
if (!paymentRecords[token]) {
return false;
}
const record = paymentRecords[token];
// Check if payment is for the right endpoint
if (record.endpoint !== endpoint) {
return false;
}
// Check if payment is still valid (15 minute window)
const now = Date.now();
if (now - record.timestamp > 15 * 60 * 1000) {
return false;
}
return true;
}
function recordUsage(token, endpoint, symbol) {
if (paymentRecords[token]) {
paymentRecords[token].usage.push({
timestamp: Date.now(),
endpoint,
symbol
});
}
}
// Webhook endpoint for Helio Pay notifications
app.post('/webhook/helio-payment', (req, res) => {
const { event, transactionObject } = req.body;
if (event === 'CREATED' && transactionObject &&
transactionObject.meta.transactionStatus === 'SUCCESS') {
try {
// Extract custom data from additionalJSON if available
let customData = {};
if (transactionObject.meta.customerDetails &&
transactionObject.meta.customerDetails.additionalJSON) {
customData = JSON.parse(transactionObject.meta.customerDetails.additionalJSON);
}
// Determine which endpoint this payment is for
let endpoint = 'unknown';
if (transactionObject.paylinkId === process.env.PRICE_ENDPOINT_PAYLINK_ID) {
endpoint = 'price';
} else if (transactionObject.paylinkId === process.env.OHLC_ENDPOINT_PAYLINK_ID) {
endpoint = 'ohlc';
} else if (transactionObject.paylinkId === process.env.DEPTH_ENDPOINT_PAYLINK_ID) {
endpoint = 'depth';
}
// Record the payment
paymentRecords[transactionObject.id] = {
id: transactionObject.
Last updated