Writable and Transform Streams
Writable streams consume data, and transform streams modify data as it passes through. Together with readable streams, they form the complete streaming picture.
Writable Streams
Creating Writable Streams
const fs = require('fs');
const writable = fs.createWriteStream('output.txt', {
encoding: 'utf8',
highWaterMark: 16384, // 16KB buffer
flags: 'w' // 'w' write, 'a' append
});
Writing Data
// write() returns boolean
const canContinue = writable.write('Hello World\n');
// If false, buffer is full - should pause
if (!canContinue) {
console.log('Buffer full, waiting for drain...');
}
// Signal end of writing
writable.end('Final data\n');
Important Events
const writable = fs.createWriteStream('output.txt');
// Buffer drained, ready for more data
writable.on('drain', () => {
console.log('Ready for more data');
});
// All data has been flushed
writable.on('finish', () => {
console.log('All data written');
});
// Error occurred
writable.on('error', (err) => {
console.error('Write error:', err);
});
// Stream closed
writable.on('close', () => {
console.log('Stream closed');
});
Handling Backpressure
const readable = fs.createReadStream('huge.txt');
const writable = fs.createWriteStream('output.txt');
readable.on('data', (chunk) => {
const ok = writable.write(chunk);
if (!ok) {
// Buffer full - pause reading
readable.pause();
}
});
writable.on('drain', () => {
// Buffer drained - resume reading
readable.resume();
});
readable.on('end', () => {
writable.end();
});
Custom Writable Stream
const { Writable } = require('stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
// Process the chunk
console.log('Received:', chunk.toString());
// Call callback when done (async ok)
callback();
},
final(callback) {
// Called when stream ends
console.log('Stream complete');
callback();
}
});
// Usage
myWritable.write('Hello');
myWritable.write('World');
myWritable.end();
Transform Streams
Transform streams are duplex streams that modify data as it passes through.
Built-in Transforms
const zlib = require('zlib');
const crypto = require('crypto');
// Compression
const gzip = zlib.createGzip();
// Encryption
const cipher = crypto.createCipheriv(
'aes-256-cbc',
key,
iv
);
// Chaining
fs.createReadStream('file.txt')
.pipe(gzip)
.pipe(fs.createWriteStream('file.txt.gz'));
Custom Transform Stream
const { Transform } = require('stream');
// Convert to uppercase
const upperCase = new Transform({
transform(chunk, encoding, callback) {
const upper = chunk.toString().toUpperCase();
this.push(upper);
callback();
}
});
process.stdin
.pipe(upperCase)
.pipe(process.stdout);
Line Splitter Transform
const { Transform } = require('stream');
class LineSplitter extends Transform {
constructor() {
super();
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
// Keep last partial line in buffer
this.buffer = lines.pop();
// Push complete lines
for (const line of lines) {
this.push(line + '\n');
}
callback();
}
_flush(callback) {
// Push remaining data
if (this.buffer) {
this.push(this.buffer);
}
callback();
}
}
Using through2 (Simplified Transforms)
const through2 = require('through2');
// Simple transform
const upperCase = through2(function(chunk, enc, callback) {
this.push(chunk.toString().toUpperCase());
callback();
});
// Object mode
const jsonParser = through2.obj(function(chunk, enc, callback) {
try {
const obj = JSON.parse(chunk);
this.push(obj);
callback();
} catch (err) {
callback(err);
}
});
The pipeline() Function
Modern Node.js provides pipeline() for better error handling:
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
Async Pipeline (Promise-based)
const { pipeline } = require('stream/promises');
async function compress() {
await pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.txt.gz')
);
console.log('Compression complete');
}
Duplex Streams
Duplex streams are both readable and writable (independent channels):
const { Duplex } = require('stream');
const net = require('net');
// TCP socket is duplex
const socket = net.connect({ port: 3000 });
// Write to socket (writable side)
socket.write('Hello server');
// Read from socket (readable side)
socket.on('data', (data) => {
console.log('Server said:', data.toString());
});
Custom Duplex
const { Duplex } = require('stream');
const echo = new Duplex({
read(size) {
// Readable side
},
write(chunk, encoding, callback) {
// Writable side - echo back to readable
this.push(chunk);
callback();
}
});
Practical Example: CSV to JSON
const { Transform } = require('stream');
const fs = require('fs');
class CSVtoJSON extends Transform {
constructor() {
super({ objectMode: true });
this.headers = null;
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop(); // Keep partial line
for (const line of lines) {
if (!line.trim()) continue;
const values = line.split(',');
if (!this.headers) {
this.headers = values;
} else {
const obj = {};
this.headers.forEach((h, i) => {
obj[h.trim()] = values[i]?.trim();
});
this.push(JSON.stringify(obj) + '\n');
}
}
callback();
}
}
// Usage
fs.createReadStream('data.csv')
.pipe(new CSVtoJSON())
.pipe(fs.createWriteStream('data.jsonl'));
Key Takeaways
- Writable streams consume data with
.write() - Handle backpressure by checking
.write()return value - Transform streams modify data as it flows through
pipeline()properly handles errors and cleanup- Duplex streams have independent read/write channels
- Object mode allows streaming JS objects, not just buffers