Luno Blog

To the moon: the future of money

Back to Blog

Librarian: How Luno does big data

Neil Garb
7 minute read

engineering_blog

Introduction

Luno loves data. In fact, using data in our decision-making processes is one of our Moontality values. Most engineering projects must first be motivated using data and are not concluded until the project’s impact on the data is measured.

Until the end of 2017, we were able to analyse customer activity using a read replica of our RDS MySQL master database. Early in 2018, we realised that this solution wasn’t scaleable: One database was no longer sufficient for storing all our data, and scaling the RDS MySQL databases or querying across multiple databases would be too cumbersome. We needed a solution that would enable us to aggregate data from disparate sources and analyse it quickly.

Data lake

We decided to establish a data lake. Luno is hosted entirely in AWS, so our plan was to use S3 buckets to store and catalogue the raw data that our various microservices generated. How data got into S3 would depend on the particular service. Our first data source was Messenger, an internal service that sends emails, SMSs and push notifications. In 2017, we refactored Messenger to write messages to S3 instead of MySQL in order to reduce load on our databases. This made it a natural point around which to build the rest of the data lake.

eng_datalake

But, moving messages out of MySQL and into S3 presented a problem: How could we have visibility into what messages were being sent to which customers? We couldn’t search every file in the messages S3 bucket every time we needed to ask this question.

We decided to index messages by user. We wrote them first to S3 folders arranged by the message’s send time and then to S3 folders arranged by the recipient customer ID. This satisfied the immediate requirement: We could query a customer’s messages by reading just the files in that customer’s folder.

However, we knew this wouldn’t scale. Indexing messages for another query would require another duplicate of each message, and not all the queries we needed to run could be optimised in this way.

gRPC and protobufs

Another consideration was that the messages we were storing in S3 were serialised protobuf blobs. Luno uses gRPC for all inter-service RPC calls. We like it because it makes the contract between client and server explicit: Protobuf definition files define the RPC calls that a server exposes, as well as the request and response types. This is especially useful if the client and server are written in different languages.

When we refactored Messenger, we decided to write the message request protobuf to S3 exactly as it was received from the client. This simplified Messenger’s architecture, but meant that we couldn’t use tools such as Amazon Athena to query messages without first converting the protobuf blobs to CSV or JSON.

We needed to perform ad hoc queries on the data in the data lake but we didn’t want to put restrictions on the format of the data. We needed a data warehouse.

Data warehouse

Redshift is AWS’s data warehouse solution. It was the natural choice for aggregating data from different data sources into one RDBMS: We could ingest as much data as we needed to from our data lake while still being able to perform ad hoc queries using SQL.

The last thing we needed was a way to migrate data from the data lake into our Redshift data warehouse. We needed to build an ETL tool that would be aware of the data formats in our data lake, and would be able to transform that data into Redshift SQL tables for analysis.

data_warehouse

Librarian

Librarian is the ETL service we built to bridge our S3 data lake and our Redshift data warehouse. Its purpose is to read raw, unindexed data from the data lake, apply various transformations to the data, and then write the data to our data warehouse. The service defines a pipeline per data set, which specifies the data source (e.g. an S3 bucket), the data format (e.g. protobuf), a transform (e.g. hash email address), and the output (e.g. Redshift):

type Pipeline struct {
	// The input reads raw bytes from the data lake.
	input input.Input
 
	// The input transform converts the raw bytes into protobuf messages.
	inputTransform input.Transform
 
	// The transform applies transformations to protobuf messages.
	transform transform.Transform
 
	// The output writes the transformed protobuf message.
	output output.Output
}

The input reads raw bytes from the data source. For S3, the pipeline remembers the last object key that was read and polls S3 periodically to check if there are any new keys since the last successfully processed one.

The input transform parses the raw bytes returned by the input and converts them into protobuf messages. Each pipeline will decode the raw bytes into a different protobuf message. The input format doesn’t need to be protobuf – if it’s in a different format, then a conversion takes place.

The transform applies any required transformations to the input protobuf message. Most often, this will involve removing personally identifiable information (PII). If the transform needs to add fields to the protobuf message, Librarian defines a wrapper protobuf message with the new fields.

The output writes transformed protobuf messages to the pipeline’s output. In Redshift’s case, we write the protobuf messages to a holding S3 bucket as batches of JSON objects and then issue a COPY command to Redshift to ingest the data. We use JSONPaths to specify the mapping from JSON to Redshift columns.

copy target_table (col1, col2, …, coln)
from ‘batch_s3_location’
format as json ‘jsonpaths_s3_location’;

datalake_warehouse

Worker architecture

Librarian’s pipeline inputs poll S3 in a loop. When a new object is found, its key and the pipeline name are pushed into an SQS queue. Workers read keys from the queue and run the corresponding pipeline on that object.

We discovered shortly after deploying Librarian that Redshift doesn’t like multiple open transactions writing to the same resources. We therefore use a distributed mutex to ensure that two workers don’t work on the same pipeline simultaneously.

architecture_4

Idempotent imports

Librarian’s architecture is distributed. It needs to read from S3, write to Redshift and also track the last successfully processed S3 object in MySQL. In order to be resilient against unexpected errors, all Librarian operations are idempotent. This means that if an error occurs while processing a particular S3 object, the same S3 object can be reprocessed without any negative side effects.

Uniqueness, primary-key, and foreign-key constraints in Amazon Redshift are informational only – they are not enforced. Librarian solves this by ingesting data from the holding S3 bucket into a staging table, deleting all the records in the target table that are also in the staging table, and then copying unique records from the staging table into the target table.

create temporary table staging_table (like target_table);
 
copy staging_table (col1, col2, …, coln)
from ‘batch_s3_location’
format as json ‘jsonpaths_s3_location’;
 
/* pkcoln are the primary key columns of the target table */
delete from target_table using staging_table
where target_table.pkcol1=staging_table.pkcol1
and target_table.pkcol2=staging_table.pkcol2
and …
and target_table.pkcoln=staging_table.pkcoln;
 
insert into target_table
select * from staging_table
group by pkcol1, pkcol2, ..., pkcoln;
 
drop table staging_table;

imports_5

Mutable vs immutable data

Librarian assumes that the data it’s importing from S3 is immutable. The logic above depends on this: How else could one tell if the records in the staging table were newer or older than the records in the target table? In Messenger’s case, messages are sent and are never updated once sent, so it’s safe to replace records in the target table with records in the staging table without worrying about which is newer.

We made a conscious decision to use Librarian for immutable data sets. Mutable data sets must either be imported into Redshift using a different system or must be made mutable by, for example, writing immutable records to the data lake corresponding to the individual mutations.

Conclusion

Librarian has proven to be a reliable way to perform ETL on our data lake and make data from different sources available for ad hoc queries. We’re able to analyse messages using Redshift, despite the fact that they’re stored as binary protobuf blobs in S3. We’re in the process of rolling out more pipelines, including some that will securely onboard data from external partners into the Luno data lake. The generic definition of pipelines in Librarian allows us to do so: different inputs, transforms and outputs can be plugged together to suit particular use cases. For example, we’re able to combine an input that polls a third-party REST API with the same output that writes to Redshift.

Would you like to work on Librarian? We’re hiring across our engineering teams in London, Cape Town and Johannesburg.

Avatar Neil Garb
Author

Neil Garb

Neil is a software engineer at Luno. He holds a BSc degree in IT from the University of Cape Town. He has worked as a software developer in several sectors, including publishing, travel, marketing, social media and e-commerce at one of Africa's largest online retailers.

It’s never too late to get started.
Buy, store and learn about Bitcoin and Ethereum now.

Desktop Icon Apple App Store Logo Google Play Store Logo