Readable Streams
Readable streams are sources of data. Understanding their two modes and how to consume them is essential for efficient data processing.
Creating Readable Streams
From Files
const fs = require('fs');
const readable = fs.createReadStream('data.txt', {
encoding: 'utf8', // Return strings instead of Buffers
highWaterMark: 16384, // Chunk size (16KB)
});
From HTTP Requests
const http = require('http');
http.get('http://example.com/data', (response) => {
// response is a readable stream
response.on('data', (chunk) => {
console.log(chunk.toString());
});
});
Custom Readable
const { Readable } = require('stream');
const readable = new Readable({
read(size) {
// Push data when read() is called
this.push('Hello ');
this.push('World!');
this.push(null); // Signal end of stream
}
});
Two Reading Modes
1. Flowing Mode
Data is read automatically and emitted via 'data' events:
const fs = require('fs');
const readable = fs.createReadStream('file.txt');
readable.on('data', (chunk) => {
console.log(`Received: ${chunk.length} bytes`);
});
readable.on('end', () => {
console.log('No more data');
});
How to enter flowing mode:
- Attach a 'data' event handler
- Call
.pipe() - Call
.resume()
2. Paused Mode (Pull-Based)
Data is read on demand by calling .read():
const fs = require('fs');
const readable = fs.createReadStream('file.txt');
readable.on('readable', () => {
let chunk;
while ((chunk = readable.read()) !== null) {
console.log(`Received: ${chunk.length} bytes`);
}
});
When to use paused mode:
- Need precise control over reading
- Processing requires waiting between chunks
- Implementing custom stream protocols
Mode Switching
const readable = fs.createReadStream('file.txt');
// Starts in paused mode
console.log(readable.readableFlowing); // null
// Switch to flowing
readable.on('data', () => {});
console.log(readable.readableFlowing); // true
// Pause
readable.pause();
console.log(readable.readableFlowing); // false
// Resume
readable.resume();
console.log(readable.readableFlowing); // true
Important Events
'data' Event
readable.on('data', (chunk) => {
// chunk is Buffer or string (if encoding set)
console.log(chunk);
});
'end' Event
readable.on('end', () => {
// No more data to consume
console.log('Stream ended');
});
'error' Event
readable.on('error', (err) => {
console.error('Stream error:', err.message);
});
'close' Event
readable.on('close', () => {
// Underlying resource closed
console.log('Stream closed');
});
'readable' Event
readable.on('readable', () => {
// New data available to read
let chunk;
while ((chunk = readable.read()) !== null) {
process(chunk);
}
});
The Internal Buffer
Readable streams have an internal buffer:
const readable = fs.createReadStream('file.txt', {
highWaterMark: 1024 // Buffer holds up to 1KB
});
// Check buffer state
console.log(readable.readableLength); // Bytes in buffer
console.log(readable.readableHighWaterMark); // Buffer limit
How It Works
┌───────────────┐ ┌────────────────┐ ┌──────────────┐
│ Data Source │────>│ Internal Buffer│────>│ Your Code │
│ (file, etc.) │ │ (highWaterMark)│ │ (data event) │
└───────────────┘ └────────────────┘ └──────────────┘
│
Pauses source when full
Resumes when drained
Practical Patterns
Reading Entire Stream to String
async function streamToString(readable) {
const chunks = [];
for await (const chunk of readable) {
chunks.push(chunk);
}
return Buffer.concat(chunks).toString('utf8');
}
// Usage
const content = await streamToString(
fs.createReadStream('file.txt')
);
Processing Lines
const readline = require('readline');
const fs = require('fs');
const rl = readline.createInterface({
input: fs.createReadStream('log.txt'),
crlfDelay: Infinity
});
rl.on('line', (line) => {
console.log(`Line: ${line}`);
});
Async Iteration (Modern Approach)
const fs = require('fs');
async function processFile() {
const readable = fs.createReadStream('data.txt');
for await (const chunk of readable) {
// Process each chunk
console.log(chunk.toString());
}
console.log('Done!');
}
Object Mode
Streams can work with objects instead of buffers:
const { Readable } = require('stream');
const readable = new Readable({
objectMode: true,
read() {
this.push({ name: 'Alice', age: 30 });
this.push({ name: 'Bob', age: 25 });
this.push(null);
}
});
readable.on('data', (obj) => {
console.log(obj.name); // Alice, then Bob
});
Error Handling Best Practices
const readable = fs.createReadStream('file.txt');
readable.on('error', (err) => {
if (err.code === 'ENOENT') {
console.error('File not found');
} else {
console.error('Stream error:', err);
}
});
// With pipe, errors don't propagate!
readable.pipe(writable);
// You must handle errors on both:
readable.on('error', handleError);
writable.on('error', handleError);
// Or use pipeline (handles errors):
const { pipeline } = require('stream');
pipeline(
readable,
writable,
(err) => {
if (err) console.error('Pipeline failed:', err);
}
);
Key Takeaways
- Two modes: Flowing (automatic) and Paused (manual)
- 'data' event puts stream in flowing mode
- 'readable' event for pull-based reading
- Internal buffer managed by highWaterMark
- Async iteration is the modern, clean approach
- Always handle errors - they don't propagate through pipe