Writable and Transform Streams

Writable streams consume data, and transform streams modify data as it passes through. Together with readable streams, they form the complete streaming picture.

Writable Streams

Creating Writable Streams

const fs = require('fs');

const writable = fs.createWriteStream('output.txt', {
  encoding: 'utf8',
  highWaterMark: 16384,  // 16KB buffer
  flags: 'w'             // 'w' write, 'a' append
});

Writing Data

// write() returns boolean
const canContinue = writable.write('Hello World\n');

// If false, buffer is full - should pause
if (!canContinue) {
  console.log('Buffer full, waiting for drain...');
}

// Signal end of writing
writable.end('Final data\n');

Important Events

const writable = fs.createWriteStream('output.txt');

// Buffer drained, ready for more data
writable.on('drain', () => {
  console.log('Ready for more data');
});

// All data has been flushed
writable.on('finish', () => {
  console.log('All data written');
});

// Error occurred
writable.on('error', (err) => {
  console.error('Write error:', err);
});

// Stream closed
writable.on('close', () => {
  console.log('Stream closed');
});

Handling Backpressure

const readable = fs.createReadStream('huge.txt');
const writable = fs.createWriteStream('output.txt');

readable.on('data', (chunk) => {
  const ok = writable.write(chunk);

  if (!ok) {
    // Buffer full - pause reading
    readable.pause();
  }
});

writable.on('drain', () => {
  // Buffer drained - resume reading
  readable.resume();
});

readable.on('end', () => {
  writable.end();
});

Custom Writable Stream

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // Process the chunk
    console.log('Received:', chunk.toString());

    // Call callback when done (async ok)
    callback();
  },

  final(callback) {
    // Called when stream ends
    console.log('Stream complete');
    callback();
  }
});

// Usage
myWritable.write('Hello');
myWritable.write('World');
myWritable.end();

Transform Streams

Transform streams are duplex streams that modify data as it passes through.

Built-in Transforms

const zlib = require('zlib');
const crypto = require('crypto');

// Compression
const gzip = zlib.createGzip();

// Encryption
const cipher = crypto.createCipheriv(
  'aes-256-cbc',
  key,
  iv
);

// Chaining
fs.createReadStream('file.txt')
  .pipe(gzip)
  .pipe(fs.createWriteStream('file.txt.gz'));

Custom Transform Stream

const { Transform } = require('stream');

// Convert to uppercase
const upperCase = new Transform({
  transform(chunk, encoding, callback) {
    const upper = chunk.toString().toUpperCase();
    this.push(upper);
    callback();
  }
});

process.stdin
  .pipe(upperCase)
  .pipe(process.stdout);

Line Splitter Transform

const { Transform } = require('stream');

class LineSplitter extends Transform {
  constructor() {
    super();
    this.buffer = '';
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');

    // Keep last partial line in buffer
    this.buffer = lines.pop();

    // Push complete lines
    for (const line of lines) {
      this.push(line + '\n');
    }

    callback();
  }

  _flush(callback) {
    // Push remaining data
    if (this.buffer) {
      this.push(this.buffer);
    }
    callback();
  }
}

Using through2 (Simplified Transforms)

const through2 = require('through2');

// Simple transform
const upperCase = through2(function(chunk, enc, callback) {
  this.push(chunk.toString().toUpperCase());
  callback();
});

// Object mode
const jsonParser = through2.obj(function(chunk, enc, callback) {
  try {
    const obj = JSON.parse(chunk);
    this.push(obj);
    callback();
  } catch (err) {
    callback(err);
  }
});

The pipeline() Function

Modern Node.js provides pipeline() for better error handling:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('output.txt.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

Async Pipeline (Promise-based)

const { pipeline } = require('stream/promises');

async function compress() {
  await pipeline(
    fs.createReadStream('input.txt'),
    zlib.createGzip(),
    fs.createWriteStream('output.txt.gz')
  );
  console.log('Compression complete');
}

Duplex Streams

Duplex streams are both readable and writable (independent channels):

const { Duplex } = require('stream');
const net = require('net');

// TCP socket is duplex
const socket = net.connect({ port: 3000 });

// Write to socket (writable side)
socket.write('Hello server');

// Read from socket (readable side)
socket.on('data', (data) => {
  console.log('Server said:', data.toString());
});

Custom Duplex

const { Duplex } = require('stream');

const echo = new Duplex({
  read(size) {
    // Readable side
  },
  write(chunk, encoding, callback) {
    // Writable side - echo back to readable
    this.push(chunk);
    callback();
  }
});

Practical Example: CSV to JSON

const { Transform } = require('stream');
const fs = require('fs');

class CSVtoJSON extends Transform {
  constructor() {
    super({ objectMode: true });
    this.headers = null;
    this.buffer = '';
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop(); // Keep partial line

    for (const line of lines) {
      if (!line.trim()) continue;

      const values = line.split(',');

      if (!this.headers) {
        this.headers = values;
      } else {
        const obj = {};
        this.headers.forEach((h, i) => {
          obj[h.trim()] = values[i]?.trim();
        });
        this.push(JSON.stringify(obj) + '\n');
      }
    }

    callback();
  }
}

// Usage
fs.createReadStream('data.csv')
  .pipe(new CSVtoJSON())
  .pipe(fs.createWriteStream('data.jsonl'));

Key Takeaways

  1. Writable streams consume data with .write()
  2. Handle backpressure by checking .write() return value
  3. Transform streams modify data as it flows through
  4. pipeline() properly handles errors and cleanup
  5. Duplex streams have independent read/write channels
  6. Object mode allows streaming JS objects, not just buffers