Back to Journal
Data EngineeringJanuary 2024·10 min read

Building an Automated Price Intelligence Pipeline from Public Data

Scraping, normalizing, and analyzing commodity prices across 47 global sources — without getting blocked or losing data integrity.

The Business Need

A commodity trading firm needed real-time visibility into global pricing across agricultural products. They were manually tracking prices from dozens of sources—government databases, exchange feeds, industry publications, and regional markets.

Our mandate: automate the collection, normalize the data, and surface actionable insights.

The Challenge

47 data sources across 12 countries, each with its own:

  • Data format (HTML tables, PDFs, APIs, Excel downloads)
  • Update frequency (real-time to monthly)
  • Language and currency
  • Quality characteristics

Architecture Overview

[Data Sources] → [Collectors] → [Raw Store] → [Normalizers] → [Clean Store] → [Analytics]
                      ↓              ↓              ↓               ↓
                 [Scheduler]    [S3/Parquet]   [Validation]    [PostgreSQL]

Collection Layer

We built source-specific collectors that handle the unique requirements of each data source:

class CollectorBase:
    def __init__(self, source_config):
        self.config = source_config
        self.rate_limiter = RateLimiter(
            requests_per_minute=source_config.rate_limit
        )

    async def collect(self) -> RawData:
        async with self.rate_limiter:
            response = await self.fetch()
            return self.parse(response)

    @abstractmethod
    async def fetch(self) -> Response:
        pass

    @abstractmethod
    def parse(self, response: Response) -> RawData:
        pass

Avoiding Detection

Web scraping at scale requires careful attention to anti-bot measures:

  1. Respectful rate limiting: We never exceeded what a human user would generate
  2. Request variation: Randomized headers, timing, and user agents
  3. IP rotation: Pool of residential proxies for sensitive sources
  4. Session management: Maintained realistic session patterns

Most importantly: we only scraped public data that sources intended to be accessible.

Handling PDFs

Several government sources publish price data as PDF reports. We built a specialized pipeline:

def extract_prices_from_pdf(pdf_bytes: bytes) -> List[PriceRecord]:
    # Extract tables using camelot
    tables = camelot.read_pdf(pdf_bytes, pages='all')

    # Identify price tables using structural patterns
    price_tables = [t for t in tables if is_price_table(t)]

    # Parse with source-specific logic
    records = []
    for table in price_tables:
        records.extend(parse_price_table(table))

    return records

Normalization Challenges

Unit Conversion

Prices came in every unit imaginable:

  • $/bushel, €/tonne, ₹/quintal
  • FOB, CIF, ex-warehouse
  • Spot, forward, average

We built a conversion engine with explicit handling for each case:

class PriceNormalizer:
    def normalize(self, raw_price: RawPrice) -> NormalizedPrice:
        # Convert to standard units (USD/MT)
        value_usd_mt = self.convert_currency(
            self.convert_weight(raw_price.value, raw_price.unit),
            raw_price.currency
        )

        # Adjust for delivery terms
        value_adjusted = self.adjust_for_incoterms(
            value_usd_mt,
            raw_price.delivery_terms
        )

        return NormalizedPrice(
            value=value_adjusted,
            source=raw_price.source,
            timestamp=raw_price.timestamp,
            confidence=self.calculate_confidence(raw_price)
        )

Data Quality

Not all sources are created equal. We implemented a quality scoring system:

  • Timeliness: How recent is the data?
  • Consistency: Does it align with historical patterns?
  • Completeness: Are all expected fields present?
  • Cross-validation: Does it match other sources?

Analytics Layer

With clean, normalized data, we built analytics capabilities:

Price Spreads

Track arbitrage opportunities across markets:

SELECT
    a.commodity,
    a.market AS market_a,
    b.market AS market_b,
    b.price - a.price AS spread,
    (b.price - a.price) / a.price * 100 AS spread_pct
FROM prices a
JOIN prices b ON a.commodity = b.commodity
    AND a.timestamp = b.timestamp
WHERE a.market != b.market
ORDER BY spread_pct DESC

Anomaly Detection

Flag unusual price movements for investigation:

def detect_anomalies(prices: pd.Series) -> pd.Series:
    # Calculate rolling statistics
    rolling_mean = prices.rolling(window=20).mean()
    rolling_std = prices.rolling(window=20).std()

    # Flag points outside 3 standard deviations
    z_scores = (prices - rolling_mean) / rolling_std
    return abs(z_scores) > 3

Results

After 6 months of operation:

  • 47 sources automated (up from 12 manual)
  • 15,000+ price points collected daily
  • 98.5% data quality score
  • 4 hours → 15 minutes time to insight

Key Learnings

  1. Source reliability varies wildly. Build monitoring and fallbacks from day one.

  2. Normalization is the hard part. Collection is straightforward compared to making data comparable.

  3. Document everything. When a source changes format (they will), you need to understand the original logic.

  4. Invest in validation. Automated quality checks caught dozens of issues before they reached users.


Data pipelines are never "done." They require ongoing maintenance, monitoring, and adaptation as sources evolve.

TL

TAKKA LABS

Engineering Team

More articles

Have a similar challenge?

We love solving complex technical problems. Let's talk about your project.

Get in touch