Data Aggregation (Map, Filter, Reduce)

Data engineers think in batches!

Thinking in batches reminds me of a famous childhood story.

Once upon a time, a long, long time ago, there was a kind and gentle King. He ruled people beyond the horizon, and his subjects loved him.

One day, a tired-looking village man came to the King and said, “Dear King, help us. I am from a village beyond the horizon. It’s been raining for several days. My village chief asked me to fetch help from you before disaster strikes. It took me five days to walk to the Kingdom, and I am tired but glad that I could deliver this message to you.”

“I am glad that you came for help. I will send Suppandi, my loyal Chief of Defence, to assess the damage and then send help,” said the King. “Suppandi, you have your orders. Now, go. Assess the damage, report to me, and help,” ordered the King.

Suppandi left to the village beyond the horizon on his fastest horse. When he reached the town, the town was flooded, and Suppandi felt the urge to return to the King quickly to inform him about the floods. So, he drove his horse faster and reached the Kingdom in 1/2 day. He went to the King and told him. “Dear King, the village is flooded. I went in a day and came back in 1/2 day to give you this information.”

Suppandi was pleased with himself. However, the King wanted more information. “Suppandi, please tell me whether people in the village have food, are children hurt? What can we do more to help?”

“I will find out, Dear King,” said Suppandi. He left again on his fastest horse. This time he reached in 1/2 day. He figured that people don’t have food, and many children are hurt and homeless. He raced back to the Kingdom. “Dear King, I reached in 1/2 day and came back in another 1/2. The villagers don’t have food to eat, and they are hungry. Several children are hurt and need medical attention,” said Suppandi.

This time the King had more questions. “Dear Suppandi, what did the village chief say? What can we do for him?”

“Dear King, I will find out. Let me leave to the village immediately,” said Suppandi.

Chanakya was eagerly listening in to the conversation. He told Suppandi, “Dear Suppandi, you must be tired. Let me take over. Take some rest.”

Immediately, Chanakya ordered his men to collect food, water, clothes, medicines, and doctors. He asked for the fastest horses, and along with several men and doctors, he left for the village beyond the horizon. When he reached, the town was flooded, and people were on their home terraces. He found several houses destroyed and hungry kids taking shelter under the trees, and many wounded villagers.

He ordered his men to save the villagers skirting the flood, protect all children, feed them, and take them to a safe place. He also called the doctors to attend to the wounds.

The men built a temporary home outside the village to give shelter to the homeless. They waited for a few days for the rain and flood to subside. When it was bright and sunny, Chanakya, his men, and the villagers cleaned the village, re-built the homes, and deposited enough food and grains for six months before saying goodbye.

Chanakya reached the Kingdom and immediately reported to the King. The King was anxious. He said, “Chanakya, you were gone for two weeks with no message from you. I was worried. Did you speak to the village Chief?”

“Dear King, Yes, on your behalf, I spoke to the village chief. I found that the village was flooded, so we rescued all the villagers, attended to the wounded, fed them, re-built their homes, and left food and grains for six months. The people have lost their belongings in flood, but all of them are safe, and they have sent their wishes and blessings for your timely help,” said Chanakya.

The King was pleased. “Chanakya, I should have sent you earlier. You are a batch thinker! Thank you,” said the King.

Suppandi was disappointed. He had worked hard to drive to the village and report to the King as instructed, but Chanakya gets all the praises. To this date, he still does not understand and is hurt.

Most non-data engineers are like Suppandi; they use programming constructs like “for,” “if,” “while,” and “do” on remote data. Most data engineers are like Chanakya; they use the programming constructs like “map,” “filter,” “reduce,” and “forEach.” Programming with data is always functional/declarative, while traditional programming is imperative.

There is nothing wrong with acting like Suppandi; he is the Chief of Defence. But, some cases require Chanakya thinking. In architectural language, Suppandi actions move data to algorithms, and Chanakya actions move algorithms to data. The latter works better when there is a distance and cost-to-travel between data and algorithms.

This difference in thinking is why data engineers use SQL, and traditional engineers use C#/Java. SQL uses declarative commands that are sent to the database to pipeline a set of actions on data. The conventional programming languages have caught up to the declarative programming paradigm by supporting lambda functions (arrow functions), and map/filter/reduce style functions on data collections. The map/filter/reduce style functions allow compilers/interpreters to leverage the underlying parallel compute backbone (the expensive eight-core CPU) or use a set of inexpensive machines for parallel computing. They are abstracting away parallelism from the programmer. The programmer helps the compiler/interpreter to identify speed-improvement opportunities by explicitly programming declaratively.

Mapping

Instead of iterating over a collection one at a time, a map is a function to apply another function to all elements of a collection. The map function may split the collection into parts to distribute to different cores/machines. The underlying collection remains immutable. In general, mapping could mean one-2-one, one-2-many, and many-2-one; and is the process of applying a relation (function) to map an element in the domain with an element in the range. In the case of computing, mapping does not change the size of the collection.

E.g., [1,2,-1,-2] => [1,4,1,4] using the squared relation is a many-2-one mapping

var numbers = [1, 2, -1, -2];
var x = numbers.map(x => x ** 2);
console.log(x);
[1,4,1,4]

E.g., [1,2,-1,-2] => [2,3,0,-1] using the plus one relation is a one-2-one mapping

var numbers = [1, 2, -1, -2];
var x = numbers.map(x => x + 1);
console.log(x);
[2, 3, 0, -1]

E.g., [1,2,-1,-2] using the plus one and squared relation is a one-2-many mapping

var numbers = [1, 2, -1, -2];
var x = numbers.map(x => [x + 1, x ** 2]);
console.log(x);
[[2, 1], [3, 4], [0, 1], [-1, 4]]

E.g., An SQL Example of a one-2-one mapping

SELECT Upper(ContactName)
FROM Customers
MARIA ANDERS
ANA TRUJILLO
ANTONIO MORENO
THOMAS HARDY

Filtering

Instead of iterating over a collection one at a time, a filter is a function to return a subset of elements that match criteria. The filter function may split the collection into parts to distribute to different cores/machines. The underlying collection remains immutable. Examples:

var numbers = [1, 2, -1, -2];
var x = numbers.filter(x => x > 0);
console.log(x);
[1, 2]
SELECT *
FROM Customers
WHERE Country="USA"

Number of Records: 13

CustomerIDCustomerNameContactNameAddressCityPostalCodeCountry
32Great Lakes Food MarketHoward Snyder2732 Baker Blvd.Eugene97403USA
36Hungry Coyote Import StoreYoshi LatimerCity Center Plaza 516 Main St.Elgin97827USA
43Lazy K Kountry StoreJohn Steel12 Orchestra TerraceWalla Walla99362USA
45Let’s Stop N ShopJaime Yorres87 Polk St. Suite 5San Francisco94117USA

Reduce

Instead of iterating over a collection one at a time, a reduce is a function to return a single value. The reduce function may split the collection into parts to distribute to different cores/machines. The underlying collection remains immutable. Examples:

var numbers = [1, 2, -1, -2];
var x = numbers.reduce((sum,x) => sum + x, 0);
console.log(x);
0
SELECT count(*)
FROM Customers
Number of Records: 1
count(*)
91

Pipelining

When multiple actions need to be performed on the data then it’s a norm to pipeline the actions. Examples:

var numbers = [1, 2, -1, -2];
var x = numbers
  .map(x => x + 1) //[2,3,0,-1]
  .filter(x => x > 0) //[2,3]
  .map(x => x ** 2) //[4,9]
  .reduce((sum, x) => sum + x, 0) //13
console.log(x);
13
SELECT Country, Upper(Country), count(*)
FROM Customers
WHERE Country LIKE "A%"        
GROUP BY Country
Number of Records: 2
Country Upper(Country) count(*)
Argentina ARGENTINA 3
Austria AUSTRIA 2

Takeaway

Data Engineers use Chanakya thinking to get work done in batches. Even streaming data is processed in mini-batches (windows). Actions on data are pipelined and expressed declaratively. The underlying compiler/interpreter abstracts away parallel computing (single device, multiple devices) from the programmer.

Think in Batches for Data.

Data Quality (Dirty vs. Clean)

Data Quality has a grayscale, and data quality engineers can continually improve data quality. Continual quality improvement is a process to achieve data quality excellence.

Dirty data may refer to several things: Redundant, Incomplete, Inaccurate, Inconsistent, Missing Lineage, Non-analyzable, and Insecure.

  • Redundant: A Person’s address data may be redundant across data sources. So, the collection of data from these multiple data sources will result in duplicates.
  • Incomplete: A Person’s address record may not have Pin Code (Zip Code) information. There could also be cases where the data may be structurally complete but semantically incomplete.
  • Inaccurate: A Person’s address record may have the wrong city and state combination (E.g., [City: Mumbai, State: Karnataka], [City: Salt Lake City, State: California])
  • Inconsistent: A Person’s middle name in one record is different from the middle name in another record. Inconsistency happens due to redundancy.
  • Missing Lineage (and Provenance): A Person’s address record may not reflect the current address as the user may not have updated it. It’s an issue of freshness.
  • Non-analyzable: A Person’s email record may be encrypted.
  • Insecure: A Person’s bank account number is available but not accessible due to privacy regulations.

The opposite of Dirty is Clean. Cleansing data is the art of correcting data after it is collected. Commonly used techniques are enrichment, de-duplication, validation, meta-information capture, and imputation.

  1. Enrichment is a mitigation technique for incomplete data. A data engineer enriches a person’s address record by adding country information by mapping the (city, state) tuple to a country.
  2. De-Duplication is a mitigation technique for redundant data. The data system identifies and drops duplicates using data identities. Inconsistencies caused by redundancies require use-case-specific mitigations.
  3. Validation is a mitigation technique that applies domain rules to verify correctness. An email address can be verified for syntactical correctness by using a regular expression (\A[\w!#$%&’+/=?{|}~^-]+(?:\.[\w!#$%&'*+/=?{|}~^-]+)@(?:[A-Z0-9-]+.)+[A-Z]{2,6}\Z). Data may be accepted or rejected based on validations.
  4. Lineage and Provenance capture is a mitigation technique for data where source or freshness is critical. An image grouping application will require meta-data about an image series (video) collected like phone type and captured date.
  5. Imputation is a mitigation technique for incomplete data (data with information gaps due to poor collection techniques). A heartrate time-series data may be dirty with missing data in minutes 1 and 12. Using data with holes may lead to failures, so a data imputation may use the previous or next value to fill the gap.

These are cleansing techniques to reduce data dirtiness after data is collected. However, data dirtiness originates at creation time, collection time, and correction time. So, a data cleansing process may not always result in non-dirty data.

A great way to start with data quality is to describe the attributes of good quality data and related measures. Once we have a description of good quality data, incrementally/iteratively use techniques like CAPA (corrective action, preventive action) with a continual quality improvement process. Once we are confident about data quality given current measures, the data engineer can introduce new KPIs or set new targets for existing ones.

Example: A research study requires collecting stroke imaging data. A description of quality attributes would be:

Data Quality AttributeDescription
Data Lineage & Provenance– Countries: {India}
– Imaging Types: {CT}
– Source: {Stroke Centers, Emergency}
– Method – Patient Position: supine
– Method – Scan extent: C2-2-vertex
– Method – Scan direction: caudocranial
– Method – Respiration: suspended
– Method – Acquisition-type: volumetric
– Method – Contrast: {Non-contrast CT, PCT with contrast}
RedundancyMultiple scans of the same patient are acceptable but need to be separated by one week.
CompletenessEach imaging scan should be accompanied by a radiology report that describes these features of the stroke:
– Time from onset: { early hyperacute (0-6H), late hyperacute (6-24H), acute (1-7D), sub-acute (1-3W), chronic (3W+) }
– CBV (Cerebral Blood Volume) in ml/100g of brain tissue
– CBF (Cerebral Blood Flow) in ml/min/100g of brain tissue
– Type of Stroke: {Hemorrhagic-Intracerebral, Hemorrhagic-subarachnoid, Ischemic-Embolic, Ischemic-Thrombotic}
AccuracyThree reads of the image by separate radiologists to circumvent human errors and bias. Anonymized Patient history is sent to the radiologist.
Security and PrivacyPatient PII is not leaked to the radiologist interpreting the result or the researcher analyzing the data.
Data Quality Attributes

As you can see from the table of attributes for CT Stroke imaging data, the quality description is data-specific and use-specific.

Data engineers compute attribute-specific metrics using data attribute descriptions on a data sample to measure overall data quality. These attribute descriptions are the N* to pursue excellence in data quality.

Summary: The creation, collection, and correction improve over some time when measured using criteria. There will always be data quality blind spots and leakages. Hence, data engineers report data quality on a grayscale with multiple attribute-specific metrics.

Streaming vs. Messaging

We already have pub/sub messaging infrastructure in our platform. Why are you asking for a streaming infrastructure? Use our pub/sub messaging infrastructure” – Platform Product Manager

Streaming and Messaging Systems are different. The use-cases are different.

Both streaming and messaging systems use the pub-sub pattern with producers posting messages and consumers subscribing. The subscribed consumers may choose to poll or get notified. Consumers in streaming systems generally poll the brokers, and the brokers push messages to consumers in messaging systems. Engineers use streaming systems to build data processing pipelines and messaging systems to develop reactive services. Both systems support delivery semantics (at least once, exactly once, at most once) of the messages. Brokers in streaming systems are dumber than messaging systems that build routing and filtering intelligence in the brokers. Streaming systems are faster than messaging systems due to a lack of routing and filtering intelligence 🙂

Let’s look at the top three critical differences in detail:

#1: Data Structures

In streaming, the data structure is a stream, and in messaging, the data structure is a queue.

Queue” is FIFO (First In First Out) data structure. Once a consumer consumes an element, it is removed from the queue, reducing the queue size. A consumer cannot fetch the “third” element from the queue. Queues don’t support random access. E.g., A queue of people waiting to board a bus.

Stream” is a data structure that is partitioned for distributed computing. If a consumer reads an element from a stream, the stream size does not reduce. The consumer can continue to read from the last read offset within a stream. Streams support random access; the consumer may choose to seek any reading offset. The brokers managing streams keep the state of each consumer’s reading offset (like a bookmark while reading a book) and allow consumers to read from the beginning, the last read offset, a specific offset, or the latest. E.g., a video stream of movies where each consumer resumes at a different offset.

In streaming systems, consumers refer to streams as Topics. Multiple consumers can simultaneously subscribe to topics. In messaging systems, the administrator configures the queues to send messages to one consumer or numerous consumers. The latter pattern is called a Topic used for notifications. A Topic in the streaming system is always a stream, and it’s always a queue in a messaging system.

Both stream and queue data structures order the elements in a sequence, and the elements are immutable. These elements may or may not be homogenous.

Queues can grow and shrink with publishers publishing and consumers consuming, respectively. Streams can grow with publishers publishing messages and do not shrink with consumers consuming. However, streams can be compacted by eliminating duplicates (on keys).

#2: Distributed (Cluster) Computing Topology

Since a single consumer consumes an element in a queue in a load-balancing pattern, the fetch must be from the central (master) node. The consumers may be in multiple nodes for distributed computing. The administrator configures the master broker node to store and forward data to other broker nodes for resiliency; however, it’s a single master active-passive distributed computing paradigm.

In the notification (topic) pattern, multiple consumers on a queue can consume filtered content to process data in parallel. The administrator configures the master node to store and forward data to other broker nodes that serve consumers. The publishers publish to a single master/leader node, but consumers can consume from multiple nodes. This pattern is the CQRS (Command Query Responsibility Segregation) pattern of distributing computing.

The streaming pattern is similar to the notification pattern w.r.t. distributed computing. Unlike messaging, partition keys break streams into shards/partitions, and the lead broker replicates these partitions to other brokers in the cluster. The leader election process selects a broker as a leader/master for a given shard/partition, and shard/partition replications serve multiple consumers in the CQRS pattern. The consumers read streams from the last offset, random offset, beginning, or latest.

If the leader fails, either a passive slave can take over, or the cluster elects a new leader from existing slaves.

#3: Routing and Content Filtering

In messaging systems, the brokers implement the concept of exchanges, where the broker can route the messages to different endpoints based on rules. The consumers can also filter content delivered to them at the broker level.

In streaming systems, the brokers do not implement routing or content filtering. Consumers may filter content, but utility libraries in the consumer filter out the content after the broker delivers the content to the consumer.

Tabular Differences View

CategoryStreamingMessaging
Support Publish and Subscribe ParadigmYesYes
Polling vs. NotificationPolling by ConsumersNotification by Brokers to consumers
Use CaseData Processing PipelinesReactive (micro)services
Delivery Semantics Supportedat-most-once
at-least-once
exactly-once
at-most-once
at-least-once
exactly-once
Intelligent BrokerNoYes
Data StructureStreamQueue
PatternsCQRSContent-Based Routing/Filtering
Worker (LB) Distribution
Notification
CQRS
Data ImmutabilityYesYes
Data RetentionYes. Not deleted after delivery.No. Deleted after delivery.
Data compactionYes. Key de-duplication.N/A
Data HomogeneityHeterogenous by Default. Supports schema checks on data outside the broker.Heterogenous by Default.
SpeedFaster than MessagingSlower than Streaming
Distributed Computing TopologyBroker cluster with single master per stream partition and consumers consuming from multiple brokers with data replicated across brokersBroker cluster with single master per topic/queue. Active-passive broker configuration for the load-balancing pattern. Data replicated across brokers for multiple consumer distribution.
State/MemoryBrokers remember the consumers’ bookmark (state) in the streamConsumers always consume from time-of-subscription (latest only)
Hub-and-Spoke ArchitectureYesYes
Vendors/Services (Examples)Kafka
Azure Event Hub
AWS Kinesis
RabbitMQ
Azure Event Grid
AWS SQS/SNS
Domain ModelA stream of GPS positions of a moving carA queue to buy train tickets
Table of Differences between Streaming/Messaging Systems

Visual Differences View

Summary

Use the right tool for the job. Use messaging systems for event-driven services and streaming systems for distributed data processing.