Streams: Introduction
Streams are one of Node's most powerful features. They let you process data piece by piece, without loading everything into memory at once.
The Problem Streams Solve
Without streams:
const fs = require('fs');
// This loads ENTIRE file into memory
const data = fs.readFileSync('huge-file.csv'); // 2GB = 2GB RAM
processData(data);
With streams:
const fs = require('fs');
// This processes file in small chunks
const stream = fs.createReadStream('huge-file.csv');
stream.on('data', (chunk) => {
processChunk(chunk); // 64KB at a time
});
Four Types of Streams
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Readable │─────>│ Transform │─────>│ Writable │
│ Stream │ │ Stream │ │ Stream │
└─────────────┘ └──────────────┘ └─────────────┘
│
(can also be)
│
┌──────────────┐
│ Duplex │
│ Stream │
└──────────────┘
1. Readable Streams
Data flows out (you read from them):
fs.createReadStream()http.IncomingMessage(request body)process.stdin
2. Writable Streams
Data flows in (you write to them):
fs.createWriteStream()http.ServerResponse(response body)process.stdout
3. Duplex Streams
Both readable AND writable (separate channels):
net.Socket- TCP connections
4. Transform Streams
Duplex streams that modify data as it passes:
zlib.createGzip()(compression)crypto.createCipher()(encryption)- Custom transformations
The Chunk Concept
Streams don't deal with whole data - they work with chunks:
const fs = require('fs');
const readable = fs.createReadStream('file.txt');
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes`);
console.log(`Type: ${chunk.constructor.name}`); // Buffer
});
// Output:
// Received 65536 bytes
// Type: Buffer
// Received 65536 bytes
// Type: Buffer
// Received 12843 bytes
// Type: Buffer
Default chunk size (highWaterMark) is 16KB for most streams, 64KB for file streams.
Buffers: Raw Binary Data
Chunks in Node are typically Buffer objects:
// Creating buffers
const buf1 = Buffer.from('Hello');
const buf2 = Buffer.alloc(10); // 10 zero bytes
// Buffer operations
console.log(buf1.toString()); // 'Hello'
console.log(buf1.length); // 5
console.log(buf1[0]); // 72 (ASCII 'H')
// Concatenating
const combined = Buffer.concat([buf1, buf2]);
Stream Events
All streams emit events:
const readable = fs.createReadStream('file.txt');
// Readable events
readable.on('data', (chunk) => {}); // Data available
readable.on('end', () => {}); // No more data
readable.on('error', (err) => {}); // Error occurred
readable.on('close', () => {}); // Stream closed
readable.on('readable', () => {}); // Data can be read
const writable = fs.createWriteStream('out.txt');
// Writable events
writable.on('drain', () => {}); // Ready for more data
writable.on('finish', () => {}); // All data flushed
writable.on('error', (err) => {}); // Error occurred
writable.on('close', () => {}); // Stream closed
The Pipe Pattern
The most elegant way to connect streams:
const fs = require('fs');
// Without pipe (manual)
const readable = fs.createReadStream('input.txt');
const writable = fs.createWriteStream('output.txt');
readable.on('data', (chunk) => {
writable.write(chunk);
});
readable.on('end', () => {
writable.end();
});
// With pipe (elegant)
fs.createReadStream('input.txt')
.pipe(fs.createWriteStream('output.txt'));
Chaining Pipes
const fs = require('fs');
const zlib = require('zlib');
// Read → Compress → Write
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz'));
// Read compressed → Decompress → Write
fs.createReadStream('input.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('output.txt'));
Backpressure
When writable can't keep up with readable:
const readable = fs.createReadStream('huge.file');
const writable = fs.createWriteStream('output.file');
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
// Writable's buffer is full!
readable.pause();
writable.once('drain', () => {
// Buffer drained, resume reading
readable.resume();
});
}
});
pipe() handles backpressure automatically!
Real-World Example: HTTP Server
const http = require('http');
const fs = require('fs');
http.createServer((req, res) => {
// req is a Readable stream (request body)
// res is a Writable stream (response body)
if (req.url === '/video') {
const videoStream = fs.createReadStream('video.mp4');
// Stream video directly to response
res.setHeader('Content-Type', 'video/mp4');
videoStream.pipe(res);
// Efficient: never loads entire video into memory!
}
}).listen(3000);
When to Use Streams
| Scenario | Use Streams? |
|---|---|
| Reading large files | Yes |
| HTTP file uploads | Yes |
| Video/audio streaming | Yes |
| Small JSON configs | No (just use readFile) |
| Database query results | Yes (cursor streaming) |
| Log file processing | Yes |
Key Takeaways
- Streams process data in chunks, not all at once
- Four types: Readable, Writable, Duplex, Transform
- Buffers hold raw binary data (chunks)
- Events signal data availability and completion
- pipe() connects streams and handles backpressure
- Memory efficient for large data processing