The Business Need
A commodity trading firm needed real-time visibility into global pricing across agricultural products. They were manually tracking prices from dozens of sources—government databases, exchange feeds, industry publications, and regional markets.
Our mandate: automate the collection, normalize the data, and surface actionable insights.
The Challenge
47 data sources across 12 countries, each with its own:
- Data format (HTML tables, PDFs, APIs, Excel downloads)
- Update frequency (real-time to monthly)
- Language and currency
- Quality characteristics
Architecture Overview
[Data Sources] → [Collectors] → [Raw Store] → [Normalizers] → [Clean Store] → [Analytics]
↓ ↓ ↓ ↓
[Scheduler] [S3/Parquet] [Validation] [PostgreSQL]
Collection Layer
We built source-specific collectors that handle the unique requirements of each data source:
class CollectorBase:
def __init__(self, source_config):
self.config = source_config
self.rate_limiter = RateLimiter(
requests_per_minute=source_config.rate_limit
)
async def collect(self) -> RawData:
async with self.rate_limiter:
response = await self.fetch()
return self.parse(response)
@abstractmethod
async def fetch(self) -> Response:
pass
@abstractmethod
def parse(self, response: Response) -> RawData:
pass
Avoiding Detection
Web scraping at scale requires careful attention to anti-bot measures:
- Respectful rate limiting: We never exceeded what a human user would generate
- Request variation: Randomized headers, timing, and user agents
- IP rotation: Pool of residential proxies for sensitive sources
- Session management: Maintained realistic session patterns
Most importantly: we only scraped public data that sources intended to be accessible.
Handling PDFs
Several government sources publish price data as PDF reports. We built a specialized pipeline:
def extract_prices_from_pdf(pdf_bytes: bytes) -> List[PriceRecord]:
# Extract tables using camelot
tables = camelot.read_pdf(pdf_bytes, pages='all')
# Identify price tables using structural patterns
price_tables = [t for t in tables if is_price_table(t)]
# Parse with source-specific logic
records = []
for table in price_tables:
records.extend(parse_price_table(table))
return records
Normalization Challenges
Unit Conversion
Prices came in every unit imaginable:
- $/bushel, €/tonne, ₹/quintal
- FOB, CIF, ex-warehouse
- Spot, forward, average
We built a conversion engine with explicit handling for each case:
class PriceNormalizer:
def normalize(self, raw_price: RawPrice) -> NormalizedPrice:
# Convert to standard units (USD/MT)
value_usd_mt = self.convert_currency(
self.convert_weight(raw_price.value, raw_price.unit),
raw_price.currency
)
# Adjust for delivery terms
value_adjusted = self.adjust_for_incoterms(
value_usd_mt,
raw_price.delivery_terms
)
return NormalizedPrice(
value=value_adjusted,
source=raw_price.source,
timestamp=raw_price.timestamp,
confidence=self.calculate_confidence(raw_price)
)
Data Quality
Not all sources are created equal. We implemented a quality scoring system:
- Timeliness: How recent is the data?
- Consistency: Does it align with historical patterns?
- Completeness: Are all expected fields present?
- Cross-validation: Does it match other sources?
Analytics Layer
With clean, normalized data, we built analytics capabilities:
Price Spreads
Track arbitrage opportunities across markets:
SELECT
a.commodity,
a.market AS market_a,
b.market AS market_b,
b.price - a.price AS spread,
(b.price - a.price) / a.price * 100 AS spread_pct
FROM prices a
JOIN prices b ON a.commodity = b.commodity
AND a.timestamp = b.timestamp
WHERE a.market != b.market
ORDER BY spread_pct DESC
Anomaly Detection
Flag unusual price movements for investigation:
def detect_anomalies(prices: pd.Series) -> pd.Series:
# Calculate rolling statistics
rolling_mean = prices.rolling(window=20).mean()
rolling_std = prices.rolling(window=20).std()
# Flag points outside 3 standard deviations
z_scores = (prices - rolling_mean) / rolling_std
return abs(z_scores) > 3
Results
After 6 months of operation:
- 47 sources automated (up from 12 manual)
- 15,000+ price points collected daily
- 98.5% data quality score
- 4 hours → 15 minutes time to insight
Key Learnings
-
Source reliability varies wildly. Build monitoring and fallbacks from day one.
-
Normalization is the hard part. Collection is straightforward compared to making data comparable.
-
Document everything. When a source changes format (they will), you need to understand the original logic.
-
Invest in validation. Automated quality checks caught dozens of issues before they reached users.
Data pipelines are never "done." They require ongoing maintenance, monitoring, and adaptation as sources evolve.