Joining Algorithm in ETL Frameworks
There are several tools to combine datasets and perform different types of joins namely:
- Inner
- Outer
- Left &
- Right
Tools like Pandas, Spark and other ETL tools does this job exceptionally well, ofcourse apart form the databases themselves.
However, a few years back, I was working on an ETL framework that required join-like features. In this article, I will discuss the idea behind the joining algorithm that I applied to solve that problem.
Before we dive into the algorithm, let's look at the datasets and try to understand the end goal.
Employees Data
id | name | dept_id | salary |
---|---|---|---|
1 | Justin | 101 | 7000.0 |
2 | Jacob | 102 | 3000.0 |
3 | Jolly | 103 | 3800.0 |
4 | Jatin | 102 | 4500.0 |
5 | Jacky | 101 | 5000.0 |
Department Data
dept_id | dept_name |
---|---|
101 | Engineering |
102 | Data Science |
103 | IT Operations |
Expected Data after join operation
id | name | dept_id | salary | dept_id_ | dept_name_ |
---|---|---|---|---|---|
1 | Justin | 101 | 7000.0 | 101 | Engineering |
2 | Jacob | 102 | 3000.0 | 102 | Data Science |
3 | Jolly | 103 | 3800.0 | 103 | IT Operations |
4 | Jatin | 102 | 4500.0 | 102 | Data Science |
5 | Jacky | 101 | 5000.0 | 101 | Engineering |
With this understanding, I will now start by using the join module to demonstrate these functionalities step by step.
Implementation
There are 2 implementations that we are going to use:
- DataSet : A wrapper class to create a wrapper on top of "namedtuples" for each row and create a dataset for the whole chunk of data.
- joiner : This method will be our joining algorithm which will actually join the data.
Step 1 : Import DataSet, joiner from the module
from core.join import DataSet, joiner
Step 2 : Prepare the data sets
emp_header = ["id", "name", "dept_id", "salary"]
emp_data = [
(1,"Justin", 101, 7000.00),
(2,"Jacob", 102, 3000.00),
(3,"Jolly", 103, 3800.00),
(4,"Jatin", 102, 4500.00),
(5,"Jacky", 101, 5000.00)
]
dept_header = ["dept_id", "dept_name"]
dept_data = [
(101, "Engineering"),
(102, "Data Science"),
(103, "IT Operations")
]
emp_data_set = DataSet(emp_header, emp_data)
dept_data_set = DataSet(dept_header, dept_data)
Step 3 : Verify the datasets using count and show methods
DataSet has implementations for the helper methods like count() for checking the number of rows and printing the dataset using show() methods which we will check later.
print("Employees count : ")
emp_data_set.count()
print("Employees data : ")
emp_data_set.show()
print("Department count :")
dept_data_set.count()
print("Department data :")
dept_data_set.show()
Output
Employees count :
5
Employees data :
+ | id=1 || name='Justin'|| dept_id=101 || salary=7000.0 | +
+ | id=2 || name='Jacob' || dept_id=102 || salary=3000.0 | +
+ | id=3 || name='Jolly' || dept_id=103 || salary=3800.0 | +
+ | id=4 || name='Jatin' || dept_id=102 || salary=4500.0 | +
+ | id=5 || name='Jacky' || dept_id=101 || salary=5000.0 | +
Department count :
3
Department data :
+ | dept_id=101 || dept_name='Engineering' | +
+ | dept_id=102 || dept_name='Data Science' | +
+ | dept_id=103 || dept_name='IT Operations' | +
Step 4 : Join the 2 datasets and create a joined dataset
# joining the datasets using the joiner method
joined_data_set = joiner(emp_data_set, dept_data_set, ["dept_id"])
Step 5 : Check the result for the joiner
print("Joined data count : ")
joined_data_set.count()
print("Joined data : ")
joined_data_set.show()
Output
Joined data count :
5
Joined data :
id=1 || name='Justin'|| dept_id=101 || salary=7000.0 || dept_id_=101 || dept_name_='Engineering'
id=5 || name='Jacky' || dept_id=101 || salary=5000.0 || dept_id_=101 || dept_name_='Engineering'
id=2 || name='Jacob' || dept_id=102 || salary=3000.0 || dept_id_=102 || dept_name_='Data Science'
id=4 || name='Jatin' || dept_id=102 || salary=4500.0 || dept_id_=102 || dept_name_='Data Science'
id=3 || name='Jolly' || dept_id=103 || salary=3800.0 || dept_id_=103 || dept_name_='IT Operations'
Detailed Implementation
Storing the data using DataSets with column definitions (type safety in future)
Datasets will be a collection of Row objects which is a wrapper on top of the namedtuple from pythons collection module.
Row Object
class Row:
def __init__(self, header:List[str], row:tuple=None)->None:
'''
A wrapper on top of namedtuple to create a Row object.
:param header - List of column names
:param row - A tuple of row [optional]
'''
self.header = header
self.schema = namedtuple("Row", [col_name.lower() for col_name in header])
if row:
self.row = self.schema(*row)
else:
self.row = self.schema(*[None]*len(self.header))
Note: Without row argument object will create Row with all values as None.
Ex : Row(id=None, name=None)
We can go ahead and write helper method to print the row with the column information in a tabular fashion and call it show()
def __repr__(self) -> str:
return self.show()
def show(self) -> str:
string = "+ "
field_width = [len(col) for col in self.header]
for col_name, col_val, width in zip(self.row._fields, self.row, field_width):
string += "| {i}={j!r:<{k}} |".format(i=col_name, j=col_val, k=width)
return string += " +"
Dataset
We will use __generate_data_set method to generate a list of Rows object using a simple list of tuple containing the data.
class DataSet:
def __init__(self, header:List[str], rows:List[Tuple[Union[int, str, date, datetime, float]]]) -> None:
'''
A wraper on top of Row class to create a complete dataset.
:param header - List of column names
:param rows - List of tuples of rows
'''
self.header = header
self.rows = rows
self.data_set = self.__generate_data_set()
def __generate_data_set(self)->List[NamedTuple]:
'''
Row object factory; generates a list of row objects
'''
row_set = []
for row in self.rows:
if not len(self.header) == len(row):
raise Exception(f"Columns mismatched expected : [{len(self.header)}] actual : [{len(row)}]")
else:
row_set.append(Row(self.header, row))
else:
return row_set
Once we have this implementation we can go ahead and add some helper methods like count() and show() just like spark.
These methods will help us check the count of rows and print the data in pandas / spark like tabular manner.
def size(self)->int:
'''
get the number of rows
'''
return len(self.data_set)
def show(self)->None:
'''
Prints dataset
'''
for row in self.data_set:
print(row.show())
else:
print()
def count(self)->None:
'''
Prints count
'''
print(self.size())
print()
Join Algorithm
Before we go ahead and implement the last and the core part i.e the joiner; lets see a simple sql statement which joins the emp_data
and dept_data
on the basis of the dept_id
as the key.
select e.*, d.*
from emp_data e
inner join dept_data d
on e.dept_id = d.dept_id
Alright; now lets define the body of the method:
from itertools import product
def joiner(left_data_set:DataSet, right_data_set:DataSet, on:List[str])->DataSet:
'''
Performs inner join on datasets
:param left_data_set - dataset with left data
:param right_data_set - dataset with right data
:param on - join on key
Returns -> joined DataSet
'''
left_rows = left_data_set.data_set
right_rows = right_data_set.data_set
# creating the main data-structure to store the Rows for the same keys
data_set = {}
# Final list to contain all the Joined Row objects later to be converted to a data set
joined_set = []
# Form all the left rows by appending at the 0th index
for left_row in left_rows:
left_key = "".join([str(getattr(left_row.row, key)) for key in on])
data_set.setdefault(left_key, ([],[]))[0].append(left_row.row)
# Form all the right rows by appending at the 1st index
for right_row in right_rows:
right_key = "".join([str(getattr(right_row.row, key)) for key in on])
data_set.setdefault(right_key, ([],[]))[1].append(right_row.row)
# Finally perform the cartesian product on the left and right datasets and join them
for ds_cols in data_set.values():
for left_cols, right_cols in product(*ds_cols):
joined_set.append(left_cols+right_cols)
# Get the column names from the left and right data sets for creating the DataSet for the joined data
result_headers = list(left_rows[0].row._fields) + [col+"_" for col in right_rows[0].row._fields]
# Finally return the DataSet object using the joined data
return DataSet(result_headers, joined_set)
Thats it; finally we are able to solve the problem of performing inner joins, which can support below operations:
- Inner join
- Joining on multiple keys
- Representation of data in DataSets
However we can further modify this algorithm and create the implementation for the left/right and the outer joins also add type safety for the data by implementing Column type and create DataSets of Rows of Columns.
If you want to improve this or add more functionalities to it then I ll encourage you to contribute using this github repo->Github_dbjoin_anisriva
Finally, I'd like to conclude that this is just an experimental exercise and ofcourse there are better tools to perform this operation.