Real-time insights: Telemetry Pipeline
0. Overview
0.1. Architecture
A telemetry pipeline is a system that collects, ingests, processes, stores, and analyzes telemetry data (metrics, logs, traces) from various sources in real-time or near real-time to provide insights into the performance and health of applications and infrastructure.
Figure 0: Barebone Telemetry Pipeline Architecture
It typically involves tools like Telegraf for data collection, Kafka for ingestion, Flink for processing, and Cassandra and VictoriaMetrics for storage and analysis.
Figure 1: Detailed Telemetry Pipeline Architecture
0.2. Stages
Collection: Telemetry data is collected from various sources using agents like Telegraf and Fluentd.
Ingestion: Data is ingested through message brokers such as Apache Kafka or Kinesis to handle high throughput.
Processing: Real-time processing is done using stream processing frameworks like Apache Flink for filtering, aggregating, and enriching data.
Storage and Analysis: Processed data is stored in systems like Cassandra, ClickHouse and Elasticsearch, and analyzed using tools like Grafana and Kibana for visualization and alerting.
1. Collection
1.1. Collection Agent
To start, we'll use Telegraf, a versatile open-source agent that collects metrics from various sources and writes them to different outputs. Telegraf supports a wide range of input and output plugins, making it easy to gather data from sensors, servers, GPS systems, and more.
Figure 2: Telegraf for collecting metrics & data
For this example, we'll focus on collecting the CPU temperature and Fan speed from a macOS system using the exec plugin in Telegraf. And leverage the osx-cpu-temp command line tool to fetch the CPU temperature.
🌵 Inlets allows devices behind firewalls or NAT to securely expose local services to the public internet by tunneling traffic through a public-facing Inlets server
1.2. Dependencies
Using Homebrew:
brew install telegraf
For other OS, refer: docs.influxdata.com/telegraf/v1/install.
Optionally, download the latest telegraf release from: https://www.influxdata.com/downloadsUsing Homebrew:
brew install osx-cpu-temp
Refer: github.com/lavoiesl/osx-cpu-temp
1.3. Events
Here's a custom script to get the CPU and Fan Speed:
#!/bin/bash
timestamp=$(date +%s)000000000
hostname=$(hostname | tr "[:upper:]" "[:lower:]")
cpu=$(osx-cpu-temp -c | sed -e 's/\([0-9.]*\).*/\1/')
fans=$(osx-cpu-temp -f | grep '^Fan' | sed -e 's/^Fan \([0-9]\) - \([a-zA-Z]*\) side *at \([0-9]*\) RPM (\([0-9]*\)%).*/\1,\2,\3,\4/')
echo "cpu_temp,device_id=$hostname temp=$cpu $timestamp"
for f in $fans; do
side=$(echo "$f" | cut -d, -f2 | tr "[:upper:]" "[:lower:]")
rpm=$(echo "$f" | cut -d, -f3)
pct=$(echo "$f" | cut -d, -f4)
echo "fan_speed,device_id=$hostname,side=$side rpm=$rpm,percent=$pct $timestamp"
done
Output Format: measurement,host=foo,tag=measure val1=5,val2=3234.34 1609459200000000000
The output is of Line protocol syntax
Where
measurement
is the “table” (“measurement" in InfluxDB terms) to which the metrics are written.host=foo,tag=measure
are tags to can group and filter by.val1=5,val2=3234.34
are values, to display in graphs.1716425990000000000
is the current unix timestamp + 9 x "0" — representing nanosecond timestamp.
Sample Output: cpu_temp,device_id=adeshs-mbp temp=0.0 1716425990000000000
1.4. Configuration
The location of telegraf.conf
installed using homebrew: /opt/homebrew/etc/telegraf.conf
Telegraf's configuration file is written using TOML and is composed of three sections: global tags, agent settings, and plugins (inputs, outputs, processors, and aggregators).
Once Telegraf collects the data, we need to transmit it to a designated endpoint for further processing. For this, we'll use the HTTP output plugin in Telegraf to send the data in JSON format to a Flask application (covered in the next section).
Below is what the telegraf.conf
file looks like, with exec
input plugin (format: influx
) and HTTP
output plugin (format: JSON
).
[agent]
interval = "10s"
round_interval = true
metric_buffer_limit = 10000
flush_buffer_when_full = true
collection_jitter = "0s"
flush_interval = "10s"
flush_jitter = "0s"
precision = ""
debug = false
quiet = false
logfile = "/path to telegraf log/telegraf.log"
hostname = "host"
omit_hostname = false
[[inputs.exec]]
commands = ["/path to custom script/osx_metrics.sh"]
timeout = "5s"
name_suffix = "_custom"
data_format = "influx"
interval = "10s"
[[outputs.http]]
url = "http://127.0.0.1:5000/metrics"
method = "POST"
timeout = "5s"
data_format = "json"
[outputs.http.headers]
Content-Type = "application/json"
Edit telegraf.conf
(use above config):
vi /opt/homebrew/etc/telegraf.conf
🚧: Don't forget to expore tons of other input and output plugins: docs.influxdata.com/telegraf/v1/plugins
1.5. Start Capture
Run telegraf
(when installed from Homebrew):
/opt/homebrew/opt/telegraf/bin/telegraf -config /opt/homebrew/etc/telegraf.conf
2. Ingestion
2.1. Telemetry Server
The telemetry server layer is designed to be lightweight. Its primary function is to authenticate incoming requests and publish raw events directly to Message Broker/Kafka. Further processing of these events will be carried out by the stream processing framework.
For our example, the Flask application serves as the telemetry server, acting as the entry point (via load-balancer) for the requests. It receives the data from a POST request, validates it, and publishes the messages to a Kafka topic.
Topic partition is the unit of parallelism in Kafka. Choose a partition key (ex: client_id) that evenly distributes records to avoid hotspots and number of partitions to achieve good throughput.
🚧 Message Broker Alternatives: Amazon Kinesis, Redpanda
2.2. Dependencies
Using PIP:
pip3 install Flask flask-cors kafka-python
For Local Kafka Set-up (Or use Docker from next sub-section):
Using Homebrew:
brew install kafka
Refer: Homebrew KafkaStart Zookeeper:
zookeeper-server-start /opt/homebrew/etc/kafka/zookeeper.properties
Start Kafka:brew services restart kafka
Create Topic:
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic learn
Usage: Kafka CLI
2.3. Docker Compose
To set up Kafka using Docker Compose, ensure Docker is installed on your machine by following the instructions on the Docker installation page. Once Docker is installed, create a docker-compose.yml
for Kafka
and Zookeeper
:
version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.5
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.3.5
ports:
- "9092:9092" # Internal port
- "9094:9094" # External port
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094
KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
depends_on:
- zookeeper
kafka-topics-creator:
image: confluentinc/cp-kafka:7.3.5
depends_on:
- kafka
entrypoint: ["/bin/sh", "-c"]
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:9092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic raw-events --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:9092 --list
"
schema-registry:
image: confluentinc/cp-schema-registry:7.3.5
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8085,http://localhost:8085
ports:
- 8085:8085
depends_on: [zookeeper, kafka]
Run docker-compose up
to start the services (Kafka + Zookeeper).
2.4. Start Server
The Flask application includes a /metrics
endpoint, as configured in telegraf.conf
output to collect metrics. When data is sent to this endpoint, the Flask app receives and publishes the message to Kafka
.
New to Flask? Refer: Flask Quickstart
import os
from flask_cors import CORS
from flask import Flask, jsonify, request
from dotenv import load_dotenv
from kafka import KafkaProducer
import json
app = Flask(__name__)
cors = CORS(app)
load_dotenv()
producer = KafkaProducer(bootstrap_servers='localhost:9094',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
@app.route('/metrics', methods=['POST'])
def process_metrics():
data = request.get_json()
print(data)
producer.send('raw-events', data)
return jsonify({'status': 'success'}), 200
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))
Start all services 🚀:
Run Flask App (Telemetry Server):
flask run
Ensure
telegraf
is running (Refer: Section 1.5)
3. Processing
3.1. Stream Processor
The Stream Processor is responsible for data transformation, enrichment, stateful computations/updates over unbounded (push-model) and bounded (pull-model) data streams and sink enriched and transformed data to various data stores or applications. Key Features to Look for in a Stream Processing Framework:
Scalability and Performance: Scale by adding nodes, efficiently use resources, process data with minimal delay, and handle large volumes
Fault Tolerance and Data Consistency: Ensure fault tolerance with state saving for failure recovery and exactly-once processing.
Ease of Use and Community Support: Provide user-friendly APIs in multiple languages, comprehensive documentation, and active community support.
Figure 3: Stateful Stream Processing
Integration and Compatibility: Seamlessly integrate with various data sources and sinks, and be compatible with other tools in your tech stack.
Windowing and Event Time Processing: Support various windowing strategies (tumbling, sliding, session) and manage late-arriving data based on event timestamps.
Security and Monitoring: Include security features like data encryption and robust access controls, and provide tools for monitoring performance and logging.
Although I have set the context to use Flink for this example;
☢️ Note: While Apache Flink is a powerful choice for stream processing due to its rich feature set, scalability, and advanced capabilities, it can be overkill for a lot of use cases, particularly those with simpler requirements and/or lower data volumes.
🚧 Open Source Alternatives: Apache Kafka Streams, Apache Storm, Apache Samza
3.2. Dependencies
Install PyFlink Using PIP:
pip3 install apache-flink==1.18.1
Usage examples: flink-python/pyflink/examples
For Local Flink Set-up: (Or use Docker from next sub-section)
Download Flink and extract the archive: www.apache.org/dyn/closer.lua/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
☢️ At the time of writing this postFlink 1.18.1
is the latest stable version that supports kafka connector plugin.Download Kafka Connector and extract the archive: www.apache.org/dyn/closer.lua/flink/flink-connector-kafka-3.1.0/flink-connector-kafka-3.1.0-src.tgz
Copy/Move theflink-connector-kafka-3.1.0-1.18.jar
toflink-1.18.1/lib
($FLINK_HOME/lib
)Ensure Flink Path is set
export FLINK_HOME=/full-path/flink-1.18.1
(add to.bashrc
/.zshrc
)Start Flink Cluster:
cd flink-1.18.1 && ./bin/start-cluster.sh
Flink dashboard at: localhost:8081To Stop Flink Cluster:
./bin/stop-cluster.sh
3.3. Docker Compose
Create
flink_init/Dockerfile
file for Flink and Kafka Connector:FROM flink:1.18.1-scala_2.12 RUN wget -P /opt/flink/lib https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-kafka/3.1.0-1.18/flink-connector-kafka-3.1.0-1.18.jar RUN chown -R flink:flink /opt/flink/lib
Add Flink to
docker-compose.yml
(in-addition to Kafka, from Section 2.3)version: '3.8' services: jobmanager: build: flink_init/. ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: build: flink_init/. depends_on: - jobmanager command: taskmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2
Run docker-compose up
to start the services (Kafka + Zookeeper, Flink).
4. Storage and Analysis
The code snippets - stops here! The rest of the post covers key conventions, strategies, and factors for selecting the right data store, performing real-time analytics, and alerts.
4.1. Datastore
When choosing the right database for telemetry data, it's crucial to consider several factors:
Read and Write Patterns: Understanding the frequency and volume of read and write operations is key. High write and read throughput require different database optimizations and consistencies.
Data Amplification: Be mindful of how the data volume might grow over time (+Write Amplification) and how the database handles this increase without significant performance degradation.
Cost: Evaluate the cost implications, including storage, processing, and any associated services.
Analytics Use Cases: Determine whether the primary need is for real-time analytics, historical data analysis, or both.
Transactions: Consider the nature and complexity of transactions that will be performed. For example: Batch write transactions
Read and Write Consistency: Decide on the level of consistency required for the application. For example, OLTP (Online Transaction Processing) systems prioritize consistency and transaction integrity, while OLAP (Online Analytical Processing) systems are optimized for complex queries and read-heavy workloads.
🌵 LSM-Tree favors write-intensive applications.
For example, to decide between Row-based vs Columar Storage. Or OLTP (Online Transaction Processing), OLAP (Online Analytical Processing), or a Hybrid approach:
Figure 4: Row vs Columnnar Storage
Transactional and High Throughput Needs: For high write throughput and transactional batches (all or nothing), with queries needing wide-column family fetches and indexed queries within the partition, Cassandra/ScyllaDB is better suited.
Complex Analytical Queries: For more complex analytical queries, aggregations on specific columns, and machine learning models, data store(s) such as ClickHouse or Druid is more appropriate. Its optimized columnar storage and powerful query capabilities make it ideal for handling large-scale analytical tasks. Several others include: VictoriaMetrics and InfluxDB (emphasis on time-series); closed-source: Snowflake, BigQuery and Redshift
Hybrid Approach: In scenarios requiring both fast write-heavy transactional processing and complex analytics, a common approach is to use Cassandra for real-time data ingestion and storage, and periodically perform ETL (Extract, Transform, Load) or CDC (Change Data Capture) processes to batch insert data into OLAP DB for analytical processing. This leverages the strengths of both databases, ensuring efficient data handling and comprehensive analytical capabilities. Proper indexing and data modeling goes unsaid 🧐
🌵 Debezium: Distributed platform for change data capture (more on previous post).
Using a HTAP (Hybrid Transactional/Analytical Processing) database that's suitable for both transactional and analytical workloads is worth considering. Example: TiDB, TimescaleDB (Kind of).
While you get some of the best from both worlds 🌎, you also inherit a few of the worst from each!
Lucky for you, I have first hand experience with it 🤭:
Figure 5: Detailed comparison of OLTP, OLAP and HTAP
Analogy: Choosing the right database is like picking the perfect ride. Need pay-as-you-go flexibility? Grab a taxi. Tackling heavy-duty tasks? 🚜 Bring in the bulldozer. For everyday use, 🚗 a Toyota fits. Bringing a war tank to a community center is overkill. Sometimes, you need a fleet—a car for daily use, and a truck for heavy loads.
☢️ InfluxDB: Stagnant contribution graph, Flux deprecation, but new benchmarks!
4.2. Partition and Indexes
Without getting into too much detail, it's crucial to choose the right partitioning strategy (Ex: Range, List, Hash) to ensure partitions don't bloat and effectively support primary read patterns (in this context, example: client_id + region + 1st Day of Month).
Figure 6: Types of Indexes and Materialized view
Following this, clustering columns and indexes help organize data within partitions to optimize range queries and sorting. Secondary indexes (within the partition/local or across partitions/global) are valuable for query patterns where partition or primary keys don't apply. Materialized views for precomputing and storing complex query results, speeding up read operations for frequently accessed data.
Figure 7: Partition Key, Clustering Keys, Local/Global Secondary Indexes and Materialized views
Multi-dimensional Index (Spatial/Spatio-temporal): Indexes such as B+ trees and LSM trees are not designed to directly store higher-dimensional data. Spatial indexing uses structures like R-trees and Quad-trees and techniques like geohash. Space-filling curves like Z-order (Morton) and Hilbert curves interleave spatial and temporal dimensions, preserving locality and enabling efficient queries.
Figure 8: Commonly Used: Types of Spatial Indexes
🌵 GeoMesa: spatio-temporal indexing on top of the Accumulo, HBase, Redis, Kafka, PostGIS and Cassandra. XZ-Ordering: Customizing Index Creation.
Next blog post is all about spatial indexes!
4.3. Analytics and Alerts
Typically, analytics are performed as batch queries on bounded datasets of recorded events, requiring reruns to incorporate new data.
Figure 9: Analytics on Static, Relative and In-Motion Data
In contrast, streaming queries ingest real-time event streams, continuously updating results as events are consumed, with outputs either written to an external database or maintained as internal state.
Figure 10: Batch Analytics vs Stream Analytics
Feature | Batch Analytics | Stream Analytics |
Data Processing | Processes large volumes of stored data | Processes data in real-time as it arrives |
Result Latency | Produces results with some delay; near real-time results with frequent query runs | Provides immediate insights and actions |
Resource Efficiency | Requires querying the database often for necessary data | Continuously updates results in transient data stores without re-querying the database |
Typical Use | Ideal for historical analysis and periodic reporting | Best for real-time monitoring, alerting, and dynamic applications |
Complexity Handling | Can handle complex queries and computations | Less effective for highly complex queries |
Backfill | Easy to backfill historical data and re-run queries | Backfill can potentially introduce complexity |
🌵 Anomaly Detection and Remediation
🌵 MindsDB: Connect Data Source, Configure AI Engine, Create AI Tables, Query for predictions and Automate workflows.
5. References
1. Wikipedia, "Telemetry," available: https://en.wikipedia.org/wiki/Telemetry. [Accessed: June 5, 2024].
2. Apache Cassandra, "Cassandra," available: https://cassandra.apache.org. [Accessed: June 5, 2024].
3. VictoriaMetrics, "VictoriaMetrics," available: https://victoriametrics.com. [Accessed: June 6, 2024].
4. Fluentd, "Fluentd," available: https://www.fluentd.org. [Accessed: June 5, 2024].
5. Elasticsearch, "Elasticsearch," available: https://www.elastic.co. [Accessed: June 5, 2024].
6. InfluxData, "Telegraf," available: https://www.influxdata.com. [Accessed: June 5, 2024].
7. InfluxData, "Telegraf Plugins," available: https://docs.influxdata.com. [Accessed: June 5, 2024].
8. GitHub, "osx-cpu-temp," available: https://github.com/lavoiesl/osx-cpu-temp. [Accessed: June 5, 2024].
9. GitHub, "Inlets," available: https://github.com/inlets/inlets. [Accessed: June 5, 2024].
10. InfluxData, "Telegraf Installation," available: https://docs.influxdata.com/telegraf/v1. [Accessed: June 5, 2024].
11. InfluxData, "InfluxDB Line Protocol," available: https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol. [Accessed: June 5, 2024].
12. GitHub, "Telegraf Exec Plugin," available: https://github.com/influxdata/telegraf/tree/master/plugins/inputs/exec. [Accessed: June 5, 2024].
13. GitHub, "Telegraf Output Plugins," available: https://github.com/influxdata/telegraf/tree/master/plugins/outputs. [Accessed: June 5, 2024].
14. Pallets Projects, "Flask," available: https://flask.palletsprojects.com. [Accessed: June 5, 2024].
15. Apache Kafka, "Kafka," available: https://kafka.apache.org. [Accessed: June 5, 2024].
16. Confluent, "Kafka Partitions," available: https://www.confluent.io. [Accessed: June 5, 2024].
17. AWS, "Amazon Kinesis," available: https://aws.amazon.com/kinesis. [Accessed: June 5, 2024].
18. Redpanda, "Redpanda," available: https://redpanda.com. [Accessed: June 5, 2024].
19. Apache, "Apache Flink," available: https://flink.apache.org. [Accessed: June 6, 2024].
20. GitHub, "flink-python/pyflink/examples," available: https://github.com/apache/flink/tree/master/flink-python/pyflink/examples. [Accessed: June 6, 2024].
21. Apache, "Flink Download," available: https://www.apache.org/dyn/closer.lua/flink. [Accessed: June 6, 2024].
22. Apache, "Flink Kafka Connector," available: https://www.apache.org/dyn/closer.lua/flink/flink-connector-kafka-3.1.0. [Accessed: June 6, 2024].
23. Docker, "Docker Installation," available: https://docs.docker.com. [Accessed: June 6, 2024].
24. Apache Kafka, "Kafka CLI," available: https://kafka.apache.org/quickstart. [Accessed: June 6, 2024].
25. Homebrew, "Kafka Installation," available: https://formulae.brew.sh/formula/kafka. [Accessed: June 6, 2024].
26. Apache, "Apache Storm," available: https://storm.apache.org. [Accessed: June 6, 2024].
27. Apache, "Apache Samza," available: https://samza.apache.org. [Accessed: June 6, 2024].
28. ClickHouse, "ClickHouse," available: https://clickhouse.com. [Accessed: June 6, 2024].
29. InfluxData, "InfluxDB Benchmarks," available: https://www.influxdata.com/benchmarks. [Accessed: June 6, 2024].
30. TiDB, "TiDB," available: https://github.com/pingcap/tidb. [Accessed: June 6, 2024].
31. Timescale, "TimescaleDB," available: https://www.timescale.com. [Accessed: June 6, 2024].
32. MindsDB, "MindsDB," available: https://docs.mindsdb.com. [Accessed: June 6, 2024].
33. Wikipedia, "Write Amplification," available: https://en.wikipedia.org/wiki/Write_amplification. [Accessed: June 6, 2024].
34. GitHub, "LSM-Tree," available: https://tikv.github.io/deep-dive/introduction/theory/lsm-tree.html. [Accessed: June 6, 2024].
Cite this article as: Adesh Nalpet Adimurthy. (Jun 7, 2024). Real-time insights: Telemetry Pipeline. PyBlog. https://www.pyblog.xyz/telemetry-pipeline