Streams: Introduction

Streams are one of Node's most powerful features. They let you process data piece by piece, without loading everything into memory at once.

The Problem Streams Solve

Without streams:

const fs = require('fs');

// This loads ENTIRE file into memory
const data = fs.readFileSync('huge-file.csv'); // 2GB = 2GB RAM
processData(data);

With streams:

const fs = require('fs');

// This processes file in small chunks
const stream = fs.createReadStream('huge-file.csv');
stream.on('data', (chunk) => {
  processChunk(chunk); // 64KB at a time
});

Four Types of Streams

┌─────────────┐      ┌──────────────┐      ┌─────────────┐
│  Readable   │─────>│   Transform  │─────>│  Writable   │
│   Stream    │      │    Stream    │      │   Stream    │
└─────────────┘      └──────────────┘      └─────────────┘
                            │
                     (can also be)
                            │
                     ┌──────────────┐
                     │    Duplex    │
                     │    Stream    │
                     └──────────────┘

1. Readable Streams

Data flows out (you read from them):

2. Writable Streams

Data flows in (you write to them):

3. Duplex Streams

Both readable AND writable (separate channels):

4. Transform Streams

Duplex streams that modify data as it passes:

The Chunk Concept

Streams don't deal with whole data - they work with chunks:

const fs = require('fs');

const readable = fs.createReadStream('file.txt');

readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes`);
  console.log(`Type: ${chunk.constructor.name}`); // Buffer
});

// Output:
// Received 65536 bytes
// Type: Buffer
// Received 65536 bytes
// Type: Buffer
// Received 12843 bytes
// Type: Buffer

Default chunk size (highWaterMark) is 16KB for most streams, 64KB for file streams.

Buffers: Raw Binary Data

Chunks in Node are typically Buffer objects:

// Creating buffers
const buf1 = Buffer.from('Hello');
const buf2 = Buffer.alloc(10); // 10 zero bytes

// Buffer operations
console.log(buf1.toString());         // 'Hello'
console.log(buf1.length);             // 5
console.log(buf1[0]);                 // 72 (ASCII 'H')

// Concatenating
const combined = Buffer.concat([buf1, buf2]);

Stream Events

All streams emit events:

const readable = fs.createReadStream('file.txt');

// Readable events
readable.on('data', (chunk) => {});    // Data available
readable.on('end', () => {});          // No more data
readable.on('error', (err) => {});     // Error occurred
readable.on('close', () => {});        // Stream closed
readable.on('readable', () => {});     // Data can be read

const writable = fs.createWriteStream('out.txt');

// Writable events
writable.on('drain', () => {});        // Ready for more data
writable.on('finish', () => {});       // All data flushed
writable.on('error', (err) => {});     // Error occurred
writable.on('close', () => {});        // Stream closed

The Pipe Pattern

The most elegant way to connect streams:

const fs = require('fs');

// Without pipe (manual)
const readable = fs.createReadStream('input.txt');
const writable = fs.createWriteStream('output.txt');

readable.on('data', (chunk) => {
  writable.write(chunk);
});
readable.on('end', () => {
  writable.end();
});

// With pipe (elegant)
fs.createReadStream('input.txt')
  .pipe(fs.createWriteStream('output.txt'));

Chaining Pipes

const fs = require('fs');
const zlib = require('zlib');

// Read → Compress → Write
fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('input.txt.gz'));

// Read compressed → Decompress → Write
fs.createReadStream('input.txt.gz')
  .pipe(zlib.createGunzip())
  .pipe(fs.createWriteStream('output.txt'));

Backpressure

When writable can't keep up with readable:

const readable = fs.createReadStream('huge.file');
const writable = fs.createWriteStream('output.file');

readable.on('data', (chunk) => {
  const canContinue = writable.write(chunk);

  if (!canContinue) {
    // Writable's buffer is full!
    readable.pause();

    writable.once('drain', () => {
      // Buffer drained, resume reading
      readable.resume();
    });
  }
});

pipe() handles backpressure automatically!

Real-World Example: HTTP Server

const http = require('http');
const fs = require('fs');

http.createServer((req, res) => {
  // req is a Readable stream (request body)
  // res is a Writable stream (response body)

  if (req.url === '/video') {
    const videoStream = fs.createReadStream('video.mp4');

    // Stream video directly to response
    res.setHeader('Content-Type', 'video/mp4');
    videoStream.pipe(res);

    // Efficient: never loads entire video into memory!
  }
}).listen(3000);

When to Use Streams

Scenario Use Streams?
Reading large files Yes
HTTP file uploads Yes
Video/audio streaming Yes
Small JSON configs No (just use readFile)
Database query results Yes (cursor streaming)
Log file processing Yes

Key Takeaways

  1. Streams process data in chunks, not all at once
  2. Four types: Readable, Writable, Duplex, Transform
  3. Buffers hold raw binary data (chunks)
  4. Events signal data availability and completion
  5. pipe() connects streams and handles backpressure
  6. Memory efficient for large data processing