Finding a company’s official website from just its name sounds simple, but it’s one of the most time-consuming tasks in B2B data enrichment. You’ve probably spent hours manually searching for websites, cross-referencing company names with domains, and wondering if there’s a better way. Well, there is.
Python developers now have access to powerful Company Name to Domain APIs that can transform this tedious process into a single line of code. But here’s the thing—not all APIs are created equal, and choosing the wrong one could leave you with outdated URLs, broken links, or worse, incorrect data that damages your outreach efforts.
What Is a Company Name to Domain API?
A Company Name to Domain API is a RESTful web service that accepts a company name as input and returns the company’s official website URL. Think of it as a digital phone book, but instead of matching names to phone numbers, it matches company names to their primary domains.
The magic happens through sophisticated algorithms that cross-reference multiple data sources, validate domain ownership, and return the most accurate website URL with a confidence score. For Python developers, this means you can automate lead generation, CRM data enrichment, and market research with just a few lines of code.
Modern APIs like CUFinder’s Company Name to Domain service maintain databases of 85M+ companies, refreshed daily to ensure you’re getting current, active websites—not abandoned domains or redirects to parking pages.
Why Python Developers Choose APIs Over Manual Research
Manual domain research is a productivity killer. When you’re building applications that need to process hundreds or thousands of company names, spending 2-3 minutes per lookup isn’t just inefficient—it’s impossible.
Python’s strength lies in automation, and finding the best Company Name to Domain finder becomes crucial when you’re handling bulk operations. A quality API can process requests in milliseconds, handle rate limiting gracefully, and provide structured JSON responses that integrate seamlessly with pandas DataFrames.
Consider this scenario: You’re building a lead scoring system that needs to validate 500 companies daily. Manual research would take 16+ hours. With a Python API integration, the same task completes in under 5 minutes.
The real advantage isn’t just speed—it’s consistency. APIs eliminate human error, provide confidence scores for each match, and maintain audit trails for compliance requirements.

Setting Up CUFinder’s Company Name to Domain API in Python
Let’s dive into the practical implementation. CUFinder’s API offers one of the most reliable solutions for finding company websites based on names, with 98% accuracy rates and real-time validation.
First, you’ll need to install the required Python libraries:
pip install requests pandas python-dotenv
Here’s a basic implementation that gets you started:
import requests
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
class CompanyDomainFinder:
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.cufinder.io/v2/cuf"
self.headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'x-api-key': self.api_key
}
def find_domain(self, company_name):
"""
Find domain for a single company name
"""
data = {'company_name': company_name}
try:
response = requests.post(
self.base_url,
headers=self.headers,
data=data
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {'error': f'Request failed: {str(e)}'}
def batch_find_domains(self, company_names, delay=0.1):
"""
Find domains for multiple companies with rate limiting
"""
import time
results = []
for company in company_names:
result = self.find_domain(company)
results.append({
'company_name': company,
'result': result
})
time.sleep(delay) # Respect rate limits
return results
# Usage example
api_key = os.getenv('CUFINDER_API_KEY')
finder = CompanyDomainFinder(api_key)
# Single company lookup
result = finder.find_domain('Apple Inc')
print(result)
This implementation handles the core functionality while respecting API rate limits and providing error handling. The response includes the domain URL, confidence level, and remaining credit count.
Handling Bulk Operations and Data Processing
When you’re processing large datasets, efficiency becomes critical. The ability to get website URLs from company names in bulk can transform how you approach data enrichment projects.
Here’s an advanced implementation that works with pandas DataFrames and handles common edge cases:
import pandas as pd
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
class BulkDomainProcessor:
def __init__(self, api_key, max_workers=5):
self.finder = CompanyDomainFinder(api_key)
self.max_workers = max_workers
def process_dataframe(self, df, company_column='company_name'):
"""
Process a pandas DataFrame with company names
"""
companies = df[company_column].dropna().unique()
# Process in batches to manage memory and rate limits
results = []
batch_size = 50
for i in range(0, len(companies), batch_size):
batch = companies[i:i+batch_size]
batch_results = self._process_batch(batch)
results.extend(batch_results)
# Progress indicator
print(f"Processed {min(i+batch_size, len(companies))}/{len(companies)} companies")
# Convert results to DataFrame
results_df = pd.DataFrame(results)
# Merge back with original data
return df.merge(results_df, left_on=company_column, right_on='input_company', how='left')
def _process_batch(self, companies):
"""
Process a batch of companies with threading
"""
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all requests
future_to_company = {
executor.submit(self.finder.find_domain, company): company
for company in companies
}
# Collect results as they complete
for future in as_completed(future_to_company):
company = future_to_company[future]
try:
result = future.result()
results.append(self._parse_result(company, result))
except Exception as e:
results.append({
'input_company': company,
'domain': None,
'confidence_level': 0,
'error': str(e)
})
# Rate limiting
time.sleep(0.1)
return results
def _parse_result(self, company, api_result):
"""
Parse API response into structured format
"""
if 'error' in api_result:
return {
'input_company': company,
'domain': None,
'confidence_level': 0,
'error': api_result['error']
}
data = api_result.get('data', {})
return {
'input_company': company,
'domain': data.get('domain'),
'confidence_level': data.get('confidence_level', 0),
'credits_remaining': data.get('credit_count'),
'error': None
}
# Usage with real data
processor = BulkDomainProcessor(api_key)
# Load your dataset
df = pd.read_csv('companies.csv')
enriched_df = processor.process_dataframe(df, 'company_name')
# Save results
enriched_df.to_csv('enriched_companies.csv', index=False)
This advanced implementation uses threading to process multiple requests concurrently while respecting rate limits. It also handles common data quality issues like duplicate company names and missing values.
Error Handling and Best Practices
Real-world API integration requires robust error handling. Network timeouts, rate limit exceeded errors, and invalid company names all need graceful handling.
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
class RobustDomainFinder(CompanyDomainFinder):
def __init__(self, api_key):
super().__init__(api_key)
self.setup_logging()
def setup_logging(self):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def find_domain_with_retry(self, company_name):
"""
Find domain with automatic retries for transient failures
"""
self.logger.info(f"Looking up domain for: {company_name}")
# Input validation
if not company_name or not isinstance(company_name, str):
raise ValueError("Company name must be a non-empty string")
# Clean input
cleaned_name = self._clean_company_name(company_name)
try:
result = self.find_domain(cleaned_name)
# Check for API-level errors
if 'error' in result:
self.logger.error(f"API error for {company_name}: {result['error']}")
return result
# Validate response structure
if 'data' not in result:
raise ValueError("Invalid API response structure")
confidence = result['data'].get('confidence_level', 0)
domain = result['data'].get('domain')
if confidence < 50: # Configurable confidence threshold
self.logger.warning(f"Low confidence ({confidence}%) for {company_name}")
self.logger.info(f"Successfully found domain for {company_name}: {domain}")
return result
except requests.exceptions.Timeout:
self.logger.error(f"Timeout for {company_name}")
raise
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429: # Rate limit
self.logger.warning("Rate limit hit, backing off...")
raise
else:
self.logger.error(f"HTTP error for {company_name}: {e}")
return {'error': f'HTTP {e.response.status_code}'}
def _clean_company_name(self, name):
"""
Clean and normalize company names for better matching
"""
# Remove common suffixes that might interfere with matching
suffixes = [' Inc', ' Inc.', ' LLC', ' Ltd', ' Ltd.', ' Corp', ' Corp.']
cleaned = name.strip()
for suffix in suffixes:
if cleaned.endswith(suffix):
cleaned = cleaned[:-len(suffix)].strip()
break
return cleaned
The retry decorator automatically handles transient failures, while input validation prevents common API errors. Logging provides visibility into the enrichment process, crucial for debugging production issues.
Integration with Popular Python Data Libraries
Python’s data ecosystem makes API integration incredibly powerful. Here’s how to integrate Company Name to Domain lookups with popular libraries:
# Integration with SQLAlchemy for database operations
from sqlalchemy import create_engine, text
import pandas as pd
class DatabaseDomainEnricher:
def __init__(self, api_key, db_connection_string):
self.finder = RobustDomainFinder(api_key)
self.engine = create_engine(db_connection_string)
def enrich_companies_table(self, table_name, company_column='company_name'):
"""
Enrich companies directly in database
"""
# Read companies that need enrichment
query = f"""
SELECT id, {company_column}
FROM {table_name}
WHERE website_url IS NULL
AND {company_column} IS NOT NULL
LIMIT 1000
"""
df = pd.read_sql(query, self.engine)
# Enrich domains
enriched_data = []
for _, row in df.iterrows():
result = self.finder.find_domain_with_retry(row[company_column])
if 'data' in result:
domain = result['data'].get('domain')
confidence = result['data'].get('confidence_level', 0)
enriched_data.append({
'id': row['id'],
'website_url': domain,
'domain_confidence': confidence
})
# Bulk update database
if enriched_data:
update_df = pd.DataFrame(enriched_data)
update_df.to_sql('temp_domain_updates', self.engine, if_exists='replace', index=False)
# Execute bulk update
update_query = f"""
UPDATE {table_name}
SET website_url = t.website_url,
domain_confidence = t.domain_confidence
FROM temp_domain_updates t
WHERE {table_name}.id = t.id
"""
with self.engine.connect() as conn:
conn.execute(text(update_query))
conn.commit()
# Integration with popular CRM export formats
def enrich_hubspot_export(csv_file, output_file):
"""
Enrich HubSpot company export with domains
"""
df = pd.read_csv(csv_file)
# HubSpot uses 'Name' column for company names
if 'Name' in df.columns:
processor = BulkDomainProcessor(api_key)
enriched_df = processor.process_dataframe(df, 'Name')
# Map back to HubSpot format
enriched_df['Website URL'] = enriched_df['domain']
enriched_df.to_csv(output_file, index=False)
return enriched_df
else:
raise ValueError("HubSpot export must contain 'Name' column")
This database integration allows you to enrich large datasets without loading everything into memory, while the CRM integration handles common export formats automatically.
Performance Optimization and Caching
For production applications processing thousands of company names, performance optimization becomes crucial. Caching frequently requested domains and optimizing API calls can dramatically improve response times.
import redis
import json
import hashlib
from datetime import timedelta
class CachedDomainFinder:
def __init__(self, api_key, redis_host='localhost', redis_port=6379):
self.finder = RobustDomainFinder(api_key)
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
self.cache_ttl = timedelta(days=30) # Cache for 30 days
def _get_cache_key(self, company_name):
"""Generate consistent cache key for company name"""
normalized = company_name.lower().strip()
return f"domain:{hashlib.md5(normalized.encode()).hexdigest()}"
def find_domain_cached(self, company_name):
"""
Find domain with Redis caching
"""
cache_key = self._get_cache_key(company_name)
# Try cache first
cached_result = self.redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Cache miss - call API
result = self.finder.find_domain_with_retry(company_name)
# Cache successful results
if 'data' in result and result['data'].get('confidence_level', 0) > 70:
self.redis_client.setex(
cache_key,
self.cache_ttl,
json.dumps(result)
)
return result
def bulk_find_with_cache(self, company_names):
"""
Bulk lookup with intelligent caching
"""
results = {}
uncached_companies = []
# Check cache for all companies
for company in company_names:
cache_key = self._get_cache_key(company)
cached = self.redis_client.get(cache_key)
if cached:
results[company] = json.loads(cached)
else:
uncached_companies.append(company)
# Process uncached companies
if uncached_companies:
processor = BulkDomainProcessor(self.finder.api_key)
uncached_results = processor._process_batch(uncached_companies)
for result in uncached_results:
company = result['input_company']
api_result = self._format_for_cache(result)
results[company] = api_result
# Cache if high confidence
if result.get('confidence_level', 0) > 70:
cache_key = self._get_cache_key(company)
self.redis_client.setex(
cache_key,
self.cache_ttl,
json.dumps(api_result)
)
return results
def _format_for_cache(self, processed_result):
"""Format processed result for cache storage"""
if processed_result.get('error'):
return {'error': processed_result['error']}
return {
'status': 1,
'data': {
'domain': processed_result.get('domain'),
'confidence_level': processed_result.get('confidence_level'),
'query': processed_result.get('input_company')
}
}
# Usage with caching
cached_finder = CachedDomainFinder(api_key)
# This will be fast on subsequent calls
companies = ['Apple Inc', 'Microsoft Corporation', 'Google LLC']
results = cached_finder.bulk_find_with_cache(companies)
The caching layer reduces API calls by up to 80% for frequently requested companies, while maintaining data freshness through configurable TTL values.
Real-World Use Cases and Examples
Let’s explore practical applications where Python developers commonly use Company Name to Domain APIs. These examples show real implementation patterns you can adapt for your specific needs.
Lead Generation Pipeline
class LeadEnrichmentPipeline:
def __init__(self, api_key):
self.domain_finder = CachedDomainFinder(api_key)
self.email_finder = EmailFinder(api_key) # Assuming additional service
def enrich_lead_list(self, leads_csv):
"""
Complete lead enrichment pipeline
"""
df = pd.read_csv(leads_csv)
# Step 1: Find company domains
enriched = self.domain_finder.bulk_find_with_cache(df['company_name'].tolist())
# Step 2: Extract domains and add to DataFrame
df['company_website'] = df['company_name'].map(
lambda x: enriched.get(x, {}).get('data', {}).get('domain')
)
# Step 3: Find contact emails using domains
df['contact_email'] = df.apply(
lambda row: self._find_contact_email(row['company_website']),
axis=1
)
# Step 4: Score leads based on website presence and confidence
df['lead_score'] = df.apply(self._calculate_lead_score, axis=1)
return df.sort_values('lead_score', ascending=False)
def _calculate_lead_score(self, row):
"""Calculate lead score based on data quality"""
score = 0
if row['company_website']:
score += 40
if row['contact_email']:
score += 30
if pd.notna(row['company_name']) and len(row['company_name']) > 3:
score += 20
return score
This pipeline demonstrates how domain finding fits into a broader lead enrichment workflow, combining multiple data sources to create scored, actionable leads.
Market Research Automation
class CompetitorAnalyzer:
def __init__(self, api_key):
self.domain_finder = CachedDomainFinder(api_key)
def analyze_market_segment(self, company_list, output_file):
"""
Analyze companies in a market segment
"""
# Enrich all companies with domains
results = self.domain_finder.bulk_find_with_cache(company_list)
analysis_data = []
for company, result in results.items():
domain_data = result.get('data', {})
analysis_data.append({
'company_name': company,
'website': domain_data.get('domain'),
'confidence': domain_data.get('confidence_level', 0),
'has_website': bool(domain_data.get('domain')),
'tld': self._extract_tld(domain_data.get('domain'))
})
df = pd.DataFrame(analysis_data)
# Generate market insights
insights = {
'total_companies': len(df),
'companies_with_websites': df['has_website'].sum(),
'average_confidence': df['confidence'].mean(),
'top_tlds': df['tld'].value_counts().head().to_dict(),
'website_coverage': (df['has_website'].sum() / len(df)) * 100
}
# Export results
with pd.ExcelWriter(output_file) as writer:
df.to_excel(writer, sheet_name='Company Data', index=False)
pd.DataFrame([insights]).to_excel(writer, sheet_name='Market Insights', index=False)
return df, insights
def _extract_tld(self, domain):
"""Extract top-level domain from URL"""
if not domain:
return None
# Remove protocol and www
clean_domain = domain.replace('http://', '').replace('https://', '').replace('www.', '')
# Extract TLD
parts = clean_domain.split('.')
return f".{parts[-1]}" if len(parts) > 1 else None
This market research tool helps business analysts understand digital presence patterns across industry segments, providing insights into competitor website strategies.
API Rate Limits and Credit Management
Understanding and managing API limits is crucial for production applications. CUFinder’s Company Name to Domain API provides transparent credit tracking and rate limiting that you need to handle gracefully.
class CreditAwareProcessor:
def __init__(self, api_key, min_credits_threshold=100):
self.finder = RobustDomainFinder(api_key)
self.min_credits = min_credits_threshold
self.current_credits = None
self.request_count = 0
def check_credits(self):
"""Check remaining credits before processing"""
test_result = self.finder.find_domain("test company")
if 'data' in test_result:
self.current_credits = test_result['data'].get('credit_count', 0)
return self.current_credits
else:
raise Exception("Unable to check credit balance")
def process_with_credit_monitoring(self, company_names):
"""Process companies while monitoring credit usage"""
if not self.current_credits:
self.check_credits()
if self.current_credits < len(company_names):
raise Exception(f"Insufficient credits: {self.current_credits} available, {len(company_names)} needed")
results = []
batch_size = 25 # Process in smaller batches for credit monitoring
for i in range(0, len(company_names), batch_size):
batch = company_names[i:i+batch_size]
# Check credits before each batch
if i > 0 and self.current_credits < self.min_credits:
logging.warning(f"Credits running low: {self.current_credits} remaining")
break
batch_results = self._process_batch_with_tracking(batch)
results.extend(batch_results)
# Update credit count from last response
if batch_results and 'credit_count' in batch_results[-1]:
self.current_credits = batch_results[-1]['credit_count']
return results
def _process_batch_with_tracking(self, companies):
"""Process batch while tracking API usage"""
results = []
for company in companies:
result = self.finder.find_domain_with_retry(company)
self.request_count += 1
# Track credit usage
if 'data' in result:
self.current_credits = result['data'].get('credit_count')
results.append({
'company': company,
'result': result,
'request_number': self.request_count,
'credits_after': self.current_credits
})
# Rate limiting
time.sleep(0.1)
return results
# Usage with credit monitoring
processor = CreditAwareProcessor(api_key, min_credits_threshold=50)
companies = pd.read_csv('companies.csv')['company_name'].tolist()
try:
results = processor.process_with_credit_monitoring(companies)
print(f"Processed {len(results)} companies, {processor.current_credits} credits remaining")
except Exception as e:
print(f"Processing stopped: {e}")
This implementation prevents unexpected API limit overruns while providing visibility into credit consumption patterns.
Testing and Validation Strategies
Robust testing ensures your domain finding implementation handles edge cases gracefully. Here’s a comprehensive testing approach:
import unittest
from unittest.mock import patch, Mock
class TestCompanyDomainFinder(unittest.TestCase):
def setUp(self):
self.api_key = "test_key"
self.finder = CompanyDomainFinder(self.api_key)
@patch('requests.post')
def test_successful_domain_lookup(self, mock_post):
# Mock successful API response
mock_response = Mock()
mock_response.json.return_value = {
'status': 1,
'data': {
'domain': 'https://apple.com',
'confidence_level': 94,
'query': 'Apple Inc',
'credit_count': 9997
}
}
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
result = self.finder.find_domain('Apple Inc')
self.assertEqual(result['data']['domain'], 'https://apple.com')
self.assertEqual(result['data']['confidence_level'], 94)
mock_post.assert_called_once()
@patch('requests.post')
def test_api_error_handling(self, mock_post):
# Mock API error
mock_post.side_effect = requests.exceptions.RequestException("Network error")
result = self.finder.find_domain('Test Company')
self.assertIn('error', result)
self.assertIn('Network error', result['error'])
def test_input_validation(self):
# Test empty company name
with self.assertRaises(ValueError):
self.finder.find_domain('')
# Test None input
with self.assertRaises(ValueError):
self.finder.find_domain(None)
def test_company_name_cleaning(self):
test_cases = [
('Apple Inc.', 'Apple'),
('Microsoft Corporation', 'Microsoft Corporation'),
('Google LLC', 'Google'),
('Amazon.com Inc', 'Amazon.com')
]
for input_name, expected in test_cases:
cleaned = self.finder._clean_company_name(input_name)
self.assertEqual(cleaned, expected)
# Integration tests with real API (run sparingly)
class TestAPIIntegration(unittest.TestCase):
def setUp(self):
self.api_key = os.getenv('CUFINDER_API_KEY')
if not self.api_key:
self.skipTest("API key not available")
self.finder = RobustDomainFinder(self.api_key)
def test_known_companies(self):
"""Test with well-known companies that should have high confidence"""
test_companies = ['Apple', 'Microsoft', 'Google', 'Amazon']
for company in test_companies:
result = self.finder.find_domain_with_retry(company)
self.assertIn('data', result)
self.assertIsNotNone(result['data']['domain'])
self.assertGreater(result['data']['confidence_level'], 80)
def test_rate_limiting(self):
"""Test that rate limiting works correctly"""
companies = ['Test Company ' + str(i) for i in range(10)]
start_time = time.time()
results = []
for company in companies:
result = self.finder.find_domain_with_retry(company)
results.append(result)
elapsed = time.time() - start_time
# Should take at least 1 second with 0.1s delays
self.assertGreater(elapsed, 0.9)
if __name__ == '__main__':
unittest.main()
These tests cover both unit testing with mocked responses and integration testing with the live API, ensuring your implementation handles real-world scenarios correctly.
Monitoring and Logging for Production
Production applications need comprehensive monitoring to track API performance, error rates, and data quality. Here’s a monitoring framework:
import logging
import json
from datetime import datetime
import sqlite3
class DomainFinderMonitor:
def __init__(self, api_key, db_path='domain_finder_logs.db'):
self.finder = RobustDomainFinder(api_key)
self.db_path = db_path
self.setup_database()
self.setup_logging()
def setup_database(self):
"""Initialize monitoring database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS api_requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME,
company_name TEXT,
domain_found TEXT,
confidence_level INTEGER,
response_time_ms INTEGER,
credits_used INTEGER,
error_message TEXT,
success BOOLEAN
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS daily_stats (
date DATE PRIMARY KEY,
total_requests INTEGER,
successful_requests INTEGER,
average_confidence REAL,
total_credits_used INTEGER,
unique_companies INTEGER
)
''')
conn.commit()
conn.close()
def setup_logging(self):
"""Configure structured logging"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('domain_finder.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def find_domain_monitored(self, company_name):
"""Find domain with comprehensive monitoring"""
start_time = time.time()
try:
result = self.finder.find_domain_with_retry(company_name)
response_time = int((time.time() - start_time) * 1000)
# Log to database
self._log_request(
company_name=company_name,
result=result,
response_time_ms=response_time,
success='data' in result
)
# Structured logging
self.logger.info(
"API Request Completed",
extra={
'company_name': company_name,
'response_time_ms': response_time,
'success': 'data' in result,
'confidence': result.get('data', {}).get('confidence_level', 0)
}
)
return result
except Exception as e:
response_time = int((time.time() - start_time) * 1000)
self._log_request(
company_name=company_name,
result={'error': str(e)},
response_time_ms=response_time,
success=False
)
self.logger.error(
f"API Request Failed: {str(e)}",
extra={
'company_name': company_name,
'response_time_ms': response_time,
'error': str(e)
}
)
raise
def _log_request(self, company_name, result, response_time_ms, success):
"""Log request details to database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
data = result.get('data', {})
cursor.execute('''
INSERT INTO api_requests
(timestamp, company_name, domain_found, confidence_level,
response_time_ms, credits_used, error_message, success)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
datetime.now(),
company_name,
data.get('domain'),
data.get('confidence_level', 0),
response_time_ms,
1 if success else 0, # Credit estimation
result.get('error'),
success
))
conn.commit()
conn.close()
def generate_daily_report(self, date=None):
"""Generate daily performance report"""
if not date:
date = datetime.now().date()
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT
COUNT(*) as total_requests,
SUM(CASE WHEN success THEN 1 ELSE 0 END) as successful_requests,
AVG(CASE WHEN success THEN confidence_level ELSE NULL END) as avg_confidence,
SUM(credits_used) as total_credits,
COUNT(DISTINCT company_name) as unique_companies,
AVG(response_time_ms) as avg_response_time
FROM api_requests
WHERE DATE(timestamp) = ?
''', (date,))
stats = cursor.fetchone()
conn.close()
report = {
'date': str(date),
'total_requests': stats[0],
'successful_requests': stats[1],
'success_rate': (stats[1] / stats[0] * 100) if stats[0] > 0 else 0,
'average_confidence': round(stats[2] or 0, 2),
'total_credits_used': stats[3],
'unique_companies': stats[4],
'average_response_time_ms': round(stats[5] or 0, 2)
}
return report
# Usage with monitoring
monitor = DomainFinderMonitor(api_key)
# Process companies with full monitoring
companies = ['Apple', 'Microsoft', 'InvalidCompany123']
for company in companies:
try:
result = monitor.find_domain_monitored(company)
print(f"Found domain for {company}: {result.get('data', {}).get('domain')}")
except Exception as e:
print(f"Failed to process {company}: {e}")
# Generate daily report
daily_report = monitor.generate_daily_report()
print(json.dumps(daily_report, indent=2))
This monitoring framework provides the observability needed for production deployments, tracking performance metrics, error rates, and credit consumption patterns.
Advanced Integration Patterns
For enterprise applications, you’ll often need to integrate domain finding with existing systems and workflows. Here are advanced patterns that solve real business problems:
CRM Integration with Webhook Automation
from flask import Flask, request, jsonify
import hmac
import hashlib
class CRMDomainEnricher:
def __init__(self, api_key):
self.finder = CachedDomainFinder(api_key)
self.app = Flask(__name__)
self.setup_routes()
def setup_routes(self):
"""Setup webhook endpoints for CRM integration"""
@self.app.route('/webhook/company-created', methods=['POST'])
def handle_company_created():
"""Handle new company creation webhook"""
# Verify webhook signature (example for HubSpot)
if not self._verify_signature(request):
return jsonify({'error': 'Invalid signature'}), 401
data = request.json
company_name = data.get('company', {}).get('name')
company_id = data.get('company', {}).get('id')
if company_name and company_id:
# Enrich domain asynchronously
self._enrich_company_async(company_id, company_name)
return jsonify({'status': 'processing'}), 200
return jsonify({'error': 'Missing company data'}), 400
@self.app.route('/webhook/batch-enrich', methods=['POST'])
def handle_batch_enrichment():
"""Handle batch enrichment requests"""
companies = request.json.get('companies', [])
if len(companies) > 100:
return jsonify({'error': 'Too many companies (max 100)'}), 400
results = self.finder.bulk_find_with_cache([c['name'] for c in companies])
# Format results for CRM update
enriched = []
for company in companies:
domain_data = results.get(company['name'], {}).get('data', {})
enriched.append({
'id': company['id'],
'name': company['name'],
'domain': domain_data.get('domain'),
'confidence': domain_data.get('confidence_level', 0)
})
return jsonify({'companies': enriched}), 200
def _verify_signature(self, request):
"""Verify webhook signature for security"""
signature = request.headers.get('X-Hub-Signature')
if not signature:
return False
expected = hmac.new(
os.getenv('WEBHOOK_SECRET').encode(),
request.data,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, f'sha256={expected}')
def _enrich_company_async(self, company_id, company_name):
"""Asynchronously enrich company and update CRM"""
import threading
def enrich_and_update():
try:
result = self.finder.find_domain_cached(company_name)
if 'data' in result:
domain = result['data'].get('domain')
confidence = result['data'].get('confidence_level', 0)
# Update CRM via API (example for HubSpot)
self._update_hubspot_company(company_id, domain, confidence)
except Exception as e:
logging.error(f"Failed to enrich company {company_id}: {e}")
thread = threading.Thread(target=enrich_and_update)
thread.start()
def _update_hubspot_company(self, company_id, domain, confidence):
"""Update HubSpot company with enriched domain"""
hubspot_api_key = os.getenv('HUBSPOT_API_KEY')
update_data = {
'properties': {
'website': domain,
'domain_confidence': confidence,
'last_enriched': datetime.now().isoformat()
}
}
response = requests.patch(
f'https://api.hubapi.com/crm/v3/objects/companies/{company_id}',
headers={'Authorization': f'Bearer {hubspot_api_key}'},
json=update_data
)
if response.status_code == 200:
logging.info(f"Updated HubSpot company {company_id} with domain {domain}")
else:
logging.error(f"Failed to update HubSpot company {company_id}: {response.text}")
# Run the webhook server
enricher = CRMDomainEnricher(api_key)
enricher.app.run(host='0.0.0.0', port=5000)
This webhook integration automatically enriches companies as they’re created in your CRM, ensuring your sales team always has up-to-date website information.
Data Pipeline with Apache Airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
def create_domain_enrichment_dag():
"""Create Airflow DAG for daily domain enrichment"""
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'company_domain_enrichment',
default_args=default_args,
description='Daily company domain enrichment pipeline',
schedule_interval='0 6 * * *', # Run daily at 6 AM
catchup=False
)
def extract_companies_for_enrichment(**context):
"""Extract companies that need domain enrichment"""
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine(os.getenv('DATABASE_URL'))
# Find companies without domains or with old data
query = """
SELECT id, company_name
FROM companies
WHERE website_url IS NULL
OR last_enriched < NOW() - INTERVAL '30 days'
LIMIT 1000
"""
df = pd.read_sql(query, engine)
# Save to temporary file for next task
df.to_csv('/tmp/companies_to_enrich.csv', index=False)
return f"Found {len(df)} companies to enrich"
def enrich_domains(**context):
"""Enrich company domains using CUFinder API"""
import pandas as pd
# Load companies from previous task
df = pd.read_csv('/tmp/companies_to_enrich.csv')
# Enrich domains
finder = CachedDomainFinder(os.getenv('CUFINDER_API_KEY'))
processor = BulkDomainProcessor(finder.api_key)
enriched_df = processor.process_dataframe(df, 'company_name')
# Save enriched data
enriched_df.to_csv('/tmp/enriched_companies.csv', index=False)
return f"Enriched {len(enriched_df)} companies"
def update_database(**context):
"""Update database with enriched domains"""
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine(os.getenv('DATABASE_URL'))
df = pd.read_csv('/tmp/enriched_companies.csv')
# Update companies with new domain data
for _, row in df.iterrows():
if pd.notna(row['domain']):
query = """
UPDATE companies
SET website_url = %s,
domain_confidence = %s,
last_enriched = NOW()
WHERE id = %s
"""
engine.execute(query, (row['domain'], row['confidence_level'], row['id']))
return f"Updated {len(df[df['domain'].notna()])} company records"
# Define tasks
extract_task = PythonOperator(
task_id='extract_companies',
python_callable=extract_companies_for_enrichment,
dag=dag
)
enrich_task = PythonOperator(
task_id='enrich_domains',
python_callable=enrich_domains,
dag=dag
)
update_task = PythonOperator(
task_id='update_database',
python_callable=update_database,
dag=dag
)
cleanup_task = BashOperator(
task_id='cleanup_temp_files',
bash_command='rm -f /tmp/companies_to_enrich.csv /tmp/enriched_companies.csv',
dag=dag
)
# Set task dependencies
extract_task >> enrich_task >> update_task >> cleanup_task
return dag
# Create the DAG
domain_enrichment_dag = create_domain_enrichment_dag()
This Airflow DAG automates daily domain enrichment for large datasets, handling failures gracefully and providing audit trails for compliance requirements.
Working with CUFinder’s Enrichment Engine
While the API provides programmatic access, CUFinder’s enrichment engine offers a user-friendly interface for bulk operations. You can combine both approaches for maximum flexibility:
class HybridEnrichmentWorkflow:
def __init__(self, api_key):
self.api_key = api_key
self.finder = CachedDomainFinder(api_key)
def process_large_dataset(self, csv_file, threshold=1000):
"""
Process large datasets using hybrid approach:
- API for small batches
- Enrichment Engine for large volumes
"""
df = pd.read_csv(csv_file)
total_companies = len(df['company_name'].dropna().unique())
if total_companies <= threshold:
# Use API for smaller datasets
return self._process_via_api(df)
else:
# Use Enrichment Engine for larger datasets
return self._process_via_enrichment_engine(df)
def _process_via_api(self, df):
"""Process using direct API calls"""
processor = BulkDomainProcessor(self.api_key)
return processor.process_dataframe(df, 'company_name')
def _process_via_enrichment_engine(self, df):
"""
Process via Enrichment Engine with API monitoring
"""
# Prepare file for Enrichment Engine
input_file = 'large_dataset_input.csv'
df[['company_name']].to_csv(input_file, index=False)
print(f"""
Large dataset detected ({len(df)} companies).
For optimal processing:
1. Upload {input_file} to CUFinder's Enrichment Engine
2. Select 'Find website from company name' service
3. Map 'company_name' column as input
4. Run enrichment and download results
5. Use merge_enrichment_results() to combine with original data
Enrichment Engine URL: https://cufinder.io/enrichment-engine/company-name-to-domain
""")
return df
def merge_enrichment_results(self, original_df, enriched_csv):
"""
Merge results from Enrichment Engine back with original data
"""
enriched_df = pd.read_csv(enriched_csv)
# Merge on company name
merged = original_df.merge(
enriched_df,
left_on='company_name',
right_on='company_name',
how='left'
)
return merged
def validate_enrichment_quality(self, df):
"""
Validate enrichment results quality
"""
if 'domain' not in df.columns:
return {'error': 'No domain column found'}
total_companies = len(df)
enriched_companies = df['domain'].notna().sum()
high_confidence = df[df.get('confidence_level', 0) > 80].shape[0]
quality_metrics = {
'total_companies': total_companies,
'enriched_companies': enriched_companies,
'enrichment_rate': (enriched_companies / total_companies) * 100,
'high_confidence_matches': high_confidence,
'high_confidence_rate': (high_confidence / total_companies) * 100 if total_companies > 0 else 0,
'average_confidence': df['confidence_level'].mean() if 'confidence_level' in df else 0
}
return quality_metrics
# Usage example
workflow = HybridEnrichmentWorkflow(api_key)
# Process any size dataset automatically
df = pd.read_csv('companies.csv')
result_df = workflow.process_large_dataset(df)
# If using Enrichment Engine, merge results later
if 'domain' not in result_df.columns:
# User processed via Enrichment Engine
final_df = workflow.merge_enrichment_results(df, 'enrichment_results.csv')
quality = workflow.validate_enrichment_quality(final_df)
print(f"Enrichment quality: {quality}")
This hybrid approach maximizes efficiency by using the API for smaller datasets and guiding users to the Enrichment Engine for larger volumes, while maintaining data quality validation throughout the process.
Best Practices and Troubleshooting
Based on real-world usage patterns, here are essential best practices for production implementations:
Data Quality and Validation
class QualityController:
def __init__(self):
self.validation_rules = {
'min_confidence': 70,
'valid_tlds': ['.com', '.org', '.net', '.edu', '.gov', '.io', '.co'],
'suspicious_domains': ['example.com', 'test.com', 'localhost']
}
def validate_domain_result(self, company_name, result):
"""Comprehensive validation of domain results"""
validation_results = {
'is_valid': True,
'warnings': [],
'errors': []
}
if 'data' not in result:
validation_results['is_valid'] = False
validation_results['errors'].append('No data in API response')
return validation_results
data = result['data']
domain = data.get('domain', '')
confidence = data.get('confidence_level', 0)
# Check confidence threshold
if confidence < self.validation_rules['min_confidence']:
validation_results['warnings'].append(f'Low confidence: {confidence}%')
# Validate domain format
if domain and not self._is_valid_domain_format(domain):
validation_results['is_valid'] = False
validation_results['errors'].append(f'Invalid domain format: {domain}')
# Check for suspicious domains
if any(suspicious in domain.lower() for suspicious in self.validation_rules['suspicious_domains']):
validation_results['warnings'].append(f'Suspicious domain detected: {domain}')
# Validate TLD
if domain and not any(domain.lower().endswith(tld) for tld in self.validation_rules['valid_tlds']):
validation_results['warnings'].append(f'Uncommon TLD: {domain}')
# Check domain-company name relevance
relevance_score = self._calculate_relevance(company_name, domain)
if relevance_score < 0.3:
validation_results['warnings'].append(f'Low name-domain relevance: {relevance_score:.2f}')
return validation_results
def _is_valid_domain_format(self, domain):
"""Validate domain format using regex"""
import re
# Remove protocol if present
clean_domain = re.sub(r'^https?://', '', domain)
# Basic domain validation
pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$'
return re.match(pattern, clean_domain) is not None
def _calculate_relevance(self, company_name, domain):
"""Calculate relevance between company name and domain"""
if not domain or not company_name:
return 0
# Extract domain name without TLD
clean_domain = domain.replace('http://', '').replace('https://', '').replace('www.', '')
domain_name = clean_domain.split('.')[0].lower()
# Clean company name
clean_company = re.sub(r'[^a-zA-Z0-9]', '', company_name.lower())
# Calculate similarity using Levenshtein distance
def levenshtein_similarity(s1, s2):
if len(s1) < len(s2):
return levenshtein_similarity(s2, s1)
if len(s2) == 0:
return 0
previous_row = range(len(s2) + 1)
for i, c1 in enumerate(s1):
current_row = [i + 1]
for j, c2 in enumerate(s2):
insertions = previous_row[j + 1] + 1
deletions = current_row[j] + 1
substitutions = previous_row[j] + (c1 != c2)
current_row.append(min(insertions, deletions, substitutions))
previous_row = current_row
max_len = max(len(s1), len(s2))
return 1 - (previous_row[-1] / max_len)
return levenshtein_similarity(clean_company, domain_name)
# Enhanced finder with quality control
class QualityControlledFinder(CachedDomainFinder):
def __init__(self, api_key):
super().__init__(api_key)
self.quality_controller = QualityController()
def find_domain_with_quality_check(self, company_name):
"""Find domain with automatic quality validation"""
result = self.find_domain_cached(company_name)
# Validate result quality
validation = self.quality_controller.validate_domain_result(company_name, result)
# Add validation results to response
result['quality_check'] = validation
# Log quality issues
if validation['warnings']:
logging.warning(f"Quality warnings for {company_name}: {validation['warnings']}")
if validation['errors']:
logging.error(f"Quality errors for {company_name}: {validation['errors']}")
return result
Performance Optimization Patterns
class PerformanceOptimizer:
def __init__(self, api_key):
self.finder = QualityControlledFinder(api_key)
self.performance_stats = {
'total_requests': 0,
'cache_hits': 0,
'api_calls': 0,
'total_time': 0
}
def optimize_batch_processing(self, companies, batch_size=25, max_workers=3):
"""
Optimized batch processing with adaptive sizing
"""
# Remove duplicates while preserving order
unique_companies = list(dict.fromkeys(companies))
# Pre-check cache for all companies
cached_results = {}
uncached_companies = []
for company in unique_companies:
cache_key = self.finder._get_cache_key(company)
cached = self.finder.redis_client.get(cache_key)
if cached:
cached_results[company] = json.loads(cached)
self.performance_stats['cache_hits'] += 1
else:
uncached_companies.append(company)
# Process uncached companies in optimized batches
if uncached_companies:
start_time = time.time()
# Adaptive batch sizing based on API performance
adaptive_batch_size = self._calculate_optimal_batch_size(len(uncached_companies))
uncached_results = self._process_with_adaptive_batching(
uncached_companies,
adaptive_batch_size,
max_workers
)
# Update performance stats
self.performance_stats['api_calls'] += len(uncached_companies)
self.performance_stats['total_time'] += time.time() - start_time
# Merge cached and uncached results
cached_results.update(uncached_results)
# Update total requests
self.performance_stats['total_requests'] += len(unique_companies)
return cached_results
def _calculate_optimal_batch_size(self, total_companies):
"""Calculate optimal batch size based on historical performance"""
if self.performance_stats['api_calls'] == 0:
return 25 # Default batch size
avg_time_per_request = self.performance_stats['total_time'] / self.performance_stats['api_calls']
# Adjust batch size based on performance
if avg_time_per_request < 0.5: # Fast responses
return min(50, total_companies)
elif avg_time_per_request < 1.0: # Medium responses
return min(25, total_companies)
else: # Slow responses
return min(10, total_companies)
def _process_with_adaptive_batching(self, companies, batch_size, max_workers):
"""Process companies with adaptive batching and error recovery"""
results = {}
failed_companies = []
for i in range(0, len(companies), batch_size):
batch = companies[i:i+batch_size]
try:
batch_results = self._process_batch_parallel(batch, max_workers)
results.update(batch_results)
except Exception as e:
logging.error(f"Batch processing failed: {e}")
# Fallback to sequential processing for failed batch
for company in batch:
try:
result = self.finder.find_domain_with_quality_check(company)
results[company] = result
except Exception as company_error:
logging.error(f"Failed to process {company}: {company_error}")
failed_companies.append(company)
# Retry failed companies with exponential backoff
if failed_companies:
self._retry_failed_companies(failed_companies, results)
return results
def _process_batch_parallel(self, companies, max_workers):
"""Process batch with parallel execution"""
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_company = {
executor.submit(self.finder.find_domain_with_quality_check, company): company
for company in companies
}
for future in as_completed(future_to_company):
company = future_to_company[future]
try:
result = future.result(timeout=30) # 30-second timeout
results[company] = result
except Exception as e:
logging.error(f"Failed to process {company}: {e}")
results[company] = {'error': str(e)}
return results
def _retry_failed_companies(self, failed_companies, results):
"""Retry failed companies with exponential backoff"""
max_retries = 3
base_delay = 1
for retry in range(max_retries):
if not failed_companies:
break
delay = base_delay * (2 ** retry)
logging.info(f"Retrying {len(failed_companies)} companies (attempt {retry + 1}/{max_retries})")
time.sleep(delay)
still_failed = []
for company in failed_companies:
try:
result = self.finder.find_domain_with_quality_check(company)
results[company] = result
except Exception as e:
logging.error(f"Retry failed for {company}: {e}")
still_failed.append(company)
failed_companies = still_failed
def get_performance_summary(self):
"""Get performance summary statistics"""
if self.performance_stats['total_requests'] == 0:
return "No requests processed yet"
cache_hit_rate = (self.performance_stats['cache_hits'] / self.performance_stats['total_requests']) * 100
avg_time_per_api_call = (self.performance_stats['total_time'] / max(self.performance_stats['api_calls'], 1))
return {
'total_requests': self.performance_stats['total_requests'],
'cache_hit_rate': f"{cache_hit_rate:.1f}%",
'api_calls_made': self.performance_stats['api_calls'],
'average_api_response_time': f"{avg_time_per_api_call:.2f}s",
'total_processing_time': f"{self.performance_stats['total_time']:.2f}s"
}
This performance optimization framework provides adaptive batching, intelligent caching, and comprehensive error recovery, ensuring your application maintains high performance even under challenging conditions.
Getting Started with CUFinder’s Company Name to Domain API
Ready to transform your Python (or JavaScript) applications with automated domain finding? CUFinder’s Company Name to Domain API offers the most reliable solution for converting company names to website URLs at scale.
Whether you’re building lead generation pipelines, enriching CRM data, or conducting market research, you now have the tools and knowledge to implement enterprise-grade domain finding in Python. The examples and patterns in this guide provide everything you need to handle real-world scenarios—from simple single-company lookups to complex batch processing workflows.
For more comprehensive data enrichment capabilities, explore CUFinder’s complete API suite, which includes email finding, company enrichment, tech stack detection, and 15+ other services that integrate seamlessly with the domain finding functionality.
Start building more intelligent applications today with CUFinder’s Company Name to Domain API. Sign up for your API key and transform how your Python applications handle company data enrichment.
⚡ Explore CUFinder APIs
Enrich people and companies at scale. Real-time endpoints for email, phone, revenue, tech stack, LinkedIn data, and more.



