Readable Streams

Readable streams are sources of data. Understanding their two modes and how to consume them is essential for efficient data processing.

Creating Readable Streams

From Files

const fs = require('fs');

const readable = fs.createReadStream('data.txt', {
  encoding: 'utf8',      // Return strings instead of Buffers
  highWaterMark: 16384,  // Chunk size (16KB)
});

From HTTP Requests

const http = require('http');

http.get('http://example.com/data', (response) => {
  // response is a readable stream
  response.on('data', (chunk) => {
    console.log(chunk.toString());
  });
});

Custom Readable

const { Readable } = require('stream');

const readable = new Readable({
  read(size) {
    // Push data when read() is called
    this.push('Hello ');
    this.push('World!');
    this.push(null); // Signal end of stream
  }
});

Two Reading Modes

1. Flowing Mode

Data is read automatically and emitted via 'data' events:

const fs = require('fs');
const readable = fs.createReadStream('file.txt');

readable.on('data', (chunk) => {
  console.log(`Received: ${chunk.length} bytes`);
});

readable.on('end', () => {
  console.log('No more data');
});

How to enter flowing mode:

2. Paused Mode (Pull-Based)

Data is read on demand by calling .read():

const fs = require('fs');
const readable = fs.createReadStream('file.txt');

readable.on('readable', () => {
  let chunk;
  while ((chunk = readable.read()) !== null) {
    console.log(`Received: ${chunk.length} bytes`);
  }
});

When to use paused mode:

Mode Switching

const readable = fs.createReadStream('file.txt');

// Starts in paused mode
console.log(readable.readableFlowing); // null

// Switch to flowing
readable.on('data', () => {});
console.log(readable.readableFlowing); // true

// Pause
readable.pause();
console.log(readable.readableFlowing); // false

// Resume
readable.resume();
console.log(readable.readableFlowing); // true

Important Events

'data' Event

readable.on('data', (chunk) => {
  // chunk is Buffer or string (if encoding set)
  console.log(chunk);
});

'end' Event

readable.on('end', () => {
  // No more data to consume
  console.log('Stream ended');
});

'error' Event

readable.on('error', (err) => {
  console.error('Stream error:', err.message);
});

'close' Event

readable.on('close', () => {
  // Underlying resource closed
  console.log('Stream closed');
});

'readable' Event

readable.on('readable', () => {
  // New data available to read
  let chunk;
  while ((chunk = readable.read()) !== null) {
    process(chunk);
  }
});

The Internal Buffer

Readable streams have an internal buffer:

const readable = fs.createReadStream('file.txt', {
  highWaterMark: 1024 // Buffer holds up to 1KB
});

// Check buffer state
console.log(readable.readableLength);    // Bytes in buffer
console.log(readable.readableHighWaterMark); // Buffer limit

How It Works

┌───────────────┐     ┌────────────────┐     ┌──────────────┐
│  Data Source  │────>│ Internal Buffer│────>│  Your Code   │
│  (file, etc.) │     │ (highWaterMark)│     │ (data event) │
└───────────────┘     └────────────────┘     └──────────────┘
                              │
                    Pauses source when full
                    Resumes when drained

Practical Patterns

Reading Entire Stream to String

async function streamToString(readable) {
  const chunks = [];

  for await (const chunk of readable) {
    chunks.push(chunk);
  }

  return Buffer.concat(chunks).toString('utf8');
}

// Usage
const content = await streamToString(
  fs.createReadStream('file.txt')
);

Processing Lines

const readline = require('readline');
const fs = require('fs');

const rl = readline.createInterface({
  input: fs.createReadStream('log.txt'),
  crlfDelay: Infinity
});

rl.on('line', (line) => {
  console.log(`Line: ${line}`);
});

Async Iteration (Modern Approach)

const fs = require('fs');

async function processFile() {
  const readable = fs.createReadStream('data.txt');

  for await (const chunk of readable) {
    // Process each chunk
    console.log(chunk.toString());
  }

  console.log('Done!');
}

Object Mode

Streams can work with objects instead of buffers:

const { Readable } = require('stream');

const readable = new Readable({
  objectMode: true,
  read() {
    this.push({ name: 'Alice', age: 30 });
    this.push({ name: 'Bob', age: 25 });
    this.push(null);
  }
});

readable.on('data', (obj) => {
  console.log(obj.name); // Alice, then Bob
});

Error Handling Best Practices

const readable = fs.createReadStream('file.txt');

readable.on('error', (err) => {
  if (err.code === 'ENOENT') {
    console.error('File not found');
  } else {
    console.error('Stream error:', err);
  }
});

// With pipe, errors don't propagate!
readable.pipe(writable);

// You must handle errors on both:
readable.on('error', handleError);
writable.on('error', handleError);

// Or use pipeline (handles errors):
const { pipeline } = require('stream');

pipeline(
  readable,
  writable,
  (err) => {
    if (err) console.error('Pipeline failed:', err);
  }
);

Key Takeaways

  1. Two modes: Flowing (automatic) and Paused (manual)
  2. 'data' event puts stream in flowing mode
  3. 'readable' event for pull-based reading
  4. Internal buffer managed by highWaterMark
  5. Async iteration is the modern, clean approach
  6. Always handle errors - they don't propagate through pipe