Deduplicating Company Records in a Multi-Source Data Centralization Project using dbt, Google BigQuery or Snowflake

One of the most common tasks in a data centralization project is to create single, deduplicated records for each of the companies, contacts, products and other entities the business interacts with. Doing this allows you to connect sales activity from Salesforce and Hubspot to project delivery and invoicing data from Jira and Xero, for example, and this article shows you how.

Mark Rittman

One of the most common tasks in a data centralization project is to create single, deduplicated records for each of the companies, contacts, products and other entities the business interacts with.

Doing this allows you to connect sales activity from Salesforce and Hubspot to project delivery and invoicing data from Jira and Xero, for example:

cross-company.pngcross-company.png

Doing this in-practice can however get pretty confusing and complicated quickly as a recent thread on the dbt Slack forum discussed. To summarise the complications this adds to a typical data centralization project:

  • Assuming you’re going to create a single record for each company, contact, product and so on, how do you store the source system IDs for those entities in such a way that you can link the incoming deals, orders, invoices and payments from those source systems to these new records?

  • What happens if not all of the sources of company data, for example, provide all of the fields that the main source provides? A CRM source such as HubSpot will typically provide dozens of data fields for each company whereas others may only provide a name and billing address

  • What if one source has the concept of a company or organisation in how it works but doesn’t provide a table of companies in its data extract; for example when you’re working with Jira as a data source and each project is named after one of your clients, but there’s only project and issue tables in the data export and no explicit “company” table

  • And what if two of the companies, contacts or products you need to deduplicate don’t naturally merge together on the name, code or other field you use for this purpose?

All of this is solvable with enough thinking, effort and coding but doing so whilst keeping your project agile, delivering fast and not becoming a spaghetti mess of hacked-together code can be a challenge. Here’s how we do this on our own client data centralization projects, using Google BigQuery or Snowflake as the data warehousing platform, dbt (“Data Build Tool”) and our open-source RA Warehouse for dbt framework available on Github as a public repo.

Step 1 : Standardise Entity Staging Data Sources

Customers, contacts, projects and other shared dimensions are automatically created from all data sources, deduplicating by name and merge lookup files using a process that preserves source system keys whilst assigning a unique ID for each customer, contact etc, as shown in the diagram below.

method.pngmethod.png

Each dbt staging module (a folder within a dbt project) provides a unique ID, prefixed with the source name, and another field value (for example, user name) that can be used for deduplicating dimension members downstream.

WITH source AS (
    {{ filter_stitch_relation(relation=var('stg_hubspot_crm_stitch_companies_table'),unique_column='companyid') }}
  ),
  renamed AS (
    SELECT
      CONCAT('{{ var('stg_hubspot_crm_id-prefix') }}',companyid) AS company_id,
      REPLACE(REPLACE(REPLACE(properties.name.value, 'Limited', ''), 'ltd', ''),', Inc.','') AS company_name,
      properties.address.value AS                   company_address,
      properties.address2.value AS                  company_address2,
      properties.city.value AS                      company_city,
      properties.state.value AS                     company_state,
      properties.country.value AS                   company_country,
      properties.zip.value AS                       company_zip,
      properties.phone.value AS                     company_phone,
      properties.website.value AS                   company_website,
      properties.industry.value AS                  company_industry,
      properties.linkedin_company_page.value AS     company_linkedin_company_page,
      properties.linkedinbio.value AS               company_linkedin_bio,
      properties.twitterhandle.value AS             company_twitterhandle,
      properties.description.value AS               company_description,
      CAST (NULL AS STRING) AS                      company_finance_status,
      cast (null as string)      as                 company_currency_code,
      properties.createdate.value AS                company_created_date,
      properties.hs_lastmodifieddate.value          company_last_modified_date
    FROM
      source
  )
SELECT
  *
FROM
  renamed

As long as a data source can provide an ID and a field by which you can connect its records to the ones from the other source systems, you can just pass null values for the ones that are missing, like this:

WITH source AS (
      {{ filter_stitch_relation(relation=var('stg_jira_projects_stitch_projects_table'),unique_column='id') }}
  ),
renamed as (
select * from (
SELECT
concat('{{ var('stg_jira_projects_id-prefix') }}',replace(name,' ','_')) AS company_id,
    name AS company_name,
    cast (null as string) as company_address,
    cast (null as string) AS company_address2,
    cast (null as string) AS company_city,
    cast (null as string) AS company_state,
    cast (null as string) AS company_country,
    cast (null as string) AS company_zip,
    cast (null as string) AS company_phone,
    cast (null as string) AS company_website,
    cast (null as string) AS company_industry,
    cast (null as string) AS company_linkedin_company_page,
    cast (null as string) AS company_linkedin_bio,
    cast (null as string) AS company_twitterhandle,
    cast (null as string) AS company_description,
    cast (null as string) as company_finance_status,
    cast (null as string)     as company_currency_code,
    cast (null as timestamp) as company_created_date,
    cast (null as timestamp) as company_last_modified_date
    FROM source )
    {{ dbt_utils.group_by(n=19) }})
select * from renamed

If you’re wondering what {{ filter_stitch_relation … }} is referring to, it’s a dbt macro we use to filter out the historic (append-only) version of rows you get when using Stitch to replicate SaaS data sources into Google BigQuery:

{%- macro filter_stitch_relation(relation, unique_column) -%}
SELECT
  *
FROM
  (
    SELECT
      *,
      MAX(_sdc_batched_at) OVER (PARTITION BY {{ unique_column }} ORDER BY _sdc_batched_at RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS max_sdc_batched_at
    FROM
      {{ relation }}
  )
WHERE
  _sdc_batched_at = max_sdc_batched_at
{%- endmacro -%}

Step 2 : Union Staging Data Sources Together

A variable we’ve created within the dbt_project.yml lists of all the staging data sources that provide company records, named in a standard way so that if we prepend “stg” and append “_staging” to the start of the data source name it equates to the name of the module (folder) within the dbt project.

crm_warehouse_company_sources: ['hubspot_crm','harvest_projects','xero_accounting',
'stripe_payments','asana_projects','jira_projects'

The “Integration” module for company data sources then uses this list of sources provided by the variable to drive a Jinja “for” loop that unions these sources together into a single CTE (common table expression)

with t_companies_pre_merged as (
    {% for source in var('crm_warehouse_company_sources') %}
      {% set relation_source = 'stg_' + source + '_companies' %}
      select
        '{{source}}' as source,
        *
        from {{ ref(relation_source) }}
        {% if not loop.last %}union all{% endif %}
      {% endfor %}
    )

Step 3 : Group and Deduplicate on a Common Field

This unioned-but-not-yet-deduplicated set of all company records from all sources is then grouped (deduplicated) on a field that contains the same value for the company across all the different sources, such as the company name.

grouped as (
      SELECT
      company_name,
      max(company_phone) as company_phone,
      max(company_website) as company_website,
      max(company_industry) as company_industry,
      max(company_linkedin_company_page) as company_linkedin_company_page,
      max(company_linkedin_bio) as company_linkedin_bio,
      max(company_twitterhandle) as company_twitterhandle,
      max(company_description) as company_description,
      max(company_finance_status) as company_finance_status,
      max(company_currency_code) as company_currency_code,
      min(company_created_date) as company_created_date,
      max(company_last_modified_date) as company_last_modified_date
    from t_companies_pre_merged
      group by 1

Step 4 : Store IDs and Multi-Value Fields in Arrays

For fields where we’d need to record a number of values for each company, for example when recording the various source system IDs we’ll need to have available when linking invoices coming from Xero, deals from Hubspot and payments from Stripe, we make use of repeating columns when working with Google BigQuery target data warehouses, like this:

all_company_ids as (
             SELECT company_name, array_agg(distinct company_id ignore nulls) as all_company_ids
             FROM t_companies_pre_merged
             group by 1),

and repeating nested columns (aka structs) when storing multi-field arrays of related values, for example when storing the components of an address:

all_company_addresses as (
             SELECT company_name, array_agg(struct(company_address,
                                                   company_address2,
                                                   company_city,
                                                   company_state,
                                                   company_country,
                                                   company_zip)       
ignore nulls) as all_company_addresses

If the target warehouse is Snowflake then we’d achieve the same result by using a VARIANT data type to store the repeating fields as JSON.

all_company_ids as (
          SELECT company_name,
                 array_agg(
                    distinct company_id
                  ) as all_company_ids
            FROM t_companies_pre_merged
          group by 1),
      all_company_addresses as (
          SELECT company_name,
                 array_agg(
                      parse_json (
                        concat('{"company_address":"',company_address,
                               '", "company_address2":"',company_address2,
                               '", "company_city":"',company_city,
                               '", "company_state":"',company_state,
                               '", "company_country":"',company_country,
                               '", "company_zip":"',company_zip,'"} ')
                      )
                 ) as all_company_addresses

Step 5: Use a Lookup File for Manual Entity Merging

For dimensions where merging of members by name is not sufficient (for example, company names that cannot be relied on to always be spelt the same across all sources) we add a seed file to provide manual lookups to map one member to another via their source system IDs, for example:

company_id,old_company_id
hubspot-3423423,xero-123121 
hubspot-2412121,stripe-214122
xero-123121,salesforce-12312412

and then extend the logic of the previous grouping step to make use of this merge file, for example when BigQuery is the target warehouse:

from companies_pre_merged c
       left outer join (
            select company_name,
            ARRAY(SELECT DISTINCT x
                    FROM UNNEST(all_company_ids) AS x) as all_company_ids
            from (
                 select company_name, array_concat_agg(all_company_ids) as all_company_ids
                 from (
                      select * from (
                          select
                          c2.company_name as company_name,
                          c2.all_company_ids as all_company_ids
                          from   {{ ref('companies_merge_list') }} m
                          join companies_pre_merged c1 on m.old_company_id in UNNEST(c1.all_company_ids)
                          join companies_pre_merged c2 on m.company_id in UNNEST(c2.all_company_ids)
                          )
                      union all
                      select * from (
                          select
                          c2.company_name as company_name,
                          c1.all_company_ids as all_company_ids
                          from   {{ ref('companies_merge_list') }} m
                          join companies_pre_merged c1 on m.old_company_id in UNNEST(c1.all_company_ids)
                          join companies_pre_merged c2 on m.company_id in UNNEST(c2.all_company_ids)
                          )
                 )
                 group by 1
            )) m
       on c.company_name = m.company_name
       where c.company_name not in (
           select
           c2.company_name
           from   {{ ref('companies_merge_list') }} m
           join companies_pre_merged c2 on m.old_company_id in UNNEST(c2.all_company_ids)
         ))

The process is the same for Snowflake data warehouse targets albeit with slightly different SQL to reflect the different approach to repeating nested columns that this platform uses

left outer join (
                      select company_name, array_agg(all_company_ids) as all_company_ids
                           from (
                             select
                               c2.company_name as company_name,
                               c2.all_company_ids as all_company_ids
                             from   {{ ref('companies_merge_list') }} m
                             join (
                               SELECT c1.company_name, c1f.value::string as all_company_ids from {{ ref('int_companies_pre_merged') }} c1,table(flatten(c1.all_company_ids)) c1f) c1
                             on m.old_company_id = c1.all_company_ids
                             join (
                               SELECT c2.company_name, c2f.value::string as all_company_ids from {{ ref('int_companies_pre_merged') }} c2,table(flatten(c2.all_company_ids)) c2f) c2
                             on m.company_id = c2.all_company_ids
                             union all
                             select
                               c2.company_name as company_name,
                               c1.all_company_ids as all_company_ids
                             from   {{ ref('companies_merge_list') }} m
                             join (
                               SELECT c1.company_name, c1f.value::string as all_company_ids from {{ ref('int_companies_pre_merged') }} c1,table(flatten(c1.all_company_ids)) c1f) c1
                               on m.old_company_id = c1.all_company_ids
                               join (
                                 SELECT c2.company_name, c2f.value::string as all_company_ids from {{ ref('int_companies_pre_merged') }} c2,table(flatten(c2.all_company_ids)) c2f) c2
                               on m.company_id = c2.all_company_ids
                             )
                       group by 1
                  ) m
             on c.company_name = m.company_name
             where c.company_name not in (
                 select
                 c2.company_name
                 from   {{ ref('companies_merge_list') }} m
                 join (SELECT c2.company_name, c2f.value::string as all_company_ids
                       from {{ ref('int_companies_pre_merged') }} c2,table(flatten(c2.all_company_ids)) c2f) c2
                       on m.old_company_id = c2.all_company_ids)

The deduplicated and grouped list of companies together with their multi-valued source IDs fields and other field groups, suitably manually and additionally merged using our manual merge lookup file are then joined together ready for loading into the companies warehouse dimension table.

SELECT i.all_user_ids,
        u.*,
        e.all_user_emails
 FROM (
	SELECT user_name,
		MAX(contact_is_contractor) as contact_is_contractor,
		MAX(contact_is_staff) as contact_is_staff,
		MAX(contact_weekly_capacity) as contact_weekly_capacity ,
		MAX(user_phone) as user_phone,
		MAX(contact_default_hourly_rate) as contact_default_hourly_rate,
		MAX(contact_cost_rate) as contact_cost_rate,
		MAX(contact_is_active) as contact_is_active,
		MAX(user_created_ts) as user_created_ts,
		MAX(user_last_modified_ts) as user_last_modified_ts,
	FROM t_users_merge_list
	GROUP BY 1) u
JOIN user_emails e 
ON u.user_name = COALESCE(e.user_name,'Unknown')
JOIN user_ids i 
ON u.user_name = i.user_name

Step 6: Generate Surrogate key for Dimension Tab;e

Then coming along at the last minute but taking all the glory, the warehouse dimension table model step at the end adds a unique identifier for the company record using the dbt_utils.surrogate_key macro.

WITH companies_dim as (
  SELECT
    {{ dbt_utils.surrogate_key(['company_name']) }} as company_pk,
    *
  FROM
    {{ ref('int_companies') }} c
)
select * from companies_dim

Step 7: Un-nest the Source ID Arrays for Fact Joins

Finally, when populating the fact tables that join to this dimension you’ll need a way to join to those repeating and variant data type columns that contain the array of source system IDs for each company, as shown in the screenshot below for Google BigQuery target warehouses.

array.pngarray.png

To do this when working with BigQuery as the target dimension we use the UNNEST() Standard SQL function to flatten those nested values and then perform the required fact table source to dimension table join, like this example for our delivery projects fact table:

WITH delivery_projects AS
  (
  SELECT *
  FROM   {{ ref('int_delivery_projects') }}
),
  companies_dim as (
    SELECT {{ dbt_utils.star(from=ref('wh_companies_dim')) }}
    from {{ ref('wh_companies_dim') }}
  )
SELECT
	...
FROM
   delivery_projects p
     JOIN companies_dim c
       ON p.company_id IN UNNEST(c.all_company_ids)

Snowflake performs the flatten of the repeated source system ID field using its own syntax, so that the equivalent SQL looks like this:

companies_dim as (
    SELECT c.company_pk, cf.value::string as company_id
    from {{ ref('wh_companies_dim') }} c,  
         table(flatten(c.all_company_ids)) cf
)
SELECT 
   ...
FROM
   delivery_projects p
     JOIN companies_dim c
       ON p.company_id = c.company_id

Interested? Find out More

You can read more about our work with dbt, Google BigQuery, Snowflake and other modern data stack technologies on our website and blog:

How we handle deduplication and merging of company and contact data is just one of the design patterns we’ve included in our RA Warehouse for dbt framework that we’ve made available as free to use, open-source code in a public repo on our Github site.

We’re a dbt Preferred Consulting Partner and this is one of the many ways we try and contribute value back to the dbt ecosystem, and we welcome issue reports, pull requests and suggestions on other problems encountered on data centralization projects that it’d be cool for us to solve using this framework.

Or are you interested but don’t have the time to do it yourself? No problem! Click here to book a 100% free, no-obligation 30 minute call to discuss your data needs and how we could help get your data centralization initiative moving now.