ข้ามไปเนื้อหาหลัก

Category: guide

Node.js Streams — Readable, Writable, Transform, Pipeline

จัดการข้อมูลปริมาณมากด้วย Node.js Streams: pipe, pipeline, Transform streams, backpressure, และ async iteration

· อ่านประมาณ 4 นาที

สารบัญ

ทำไมต้องใช้ Streams

// ❌ อ่านทั้งไฟล์เข้า memory — พัง ถ้าไฟล์ใหญ่กว่า RAM
const content = fs.readFileSync('huge-file.csv');  // 10GB file → OOM

// ✓ Stream: อ่านทีละ chunk — ใช้ RAM คงที่ไม่ว่าไฟล์จะใหญ่แค่ไหน
fs.createReadStream('huge-file.csv').pipe(processStream).pipe(outputStream);

Readable Stream

import fs from 'node:fs';
import { createReadStream } from 'node:fs';

// อ่านไฟล์เป็น stream
const readable = createReadStream('data.csv', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024,  // chunk size: 64KB (default: 64KB)
});

// Event-based
readable.on('data', (chunk) => {
  console.log('Got chunk:', chunk.length, 'bytes');
});
readable.on('end', () => console.log('Done'));
readable.on('error', (err) => console.error(err));

// Async iteration (ง่ายกว่า events)
for await (const chunk of readable) {
  process.stdout.write(chunk);
}

สร้าง Readable จ้าก Generator

import { Readable } from 'node:stream';

async function* generateData() {
  for (let i = 0; i < 1000; i++) {
    yield `line ${i}\n`;
    // simulate async work
    await new Promise((r) => setTimeout(r, 0));
  }
}

const stream = Readable.from(generateData());
for await (const line of stream) {
  process.stdout.write(line);
}

Writable Stream

import { createWriteStream } from 'node:fs';

const writable = createWriteStream('output.txt', { encoding: 'utf8' });

// เขียน — returns false ถ้า buffer เต็ม (backpressure signal)
const canContinue = writable.write('Hello, World!\n');

// ถ้า write() return false ต้องรอ 'drain' event ก่อนเขียนต่อ
if (!canContinue) {
  writable.once('drain', () => {
    writable.write('More data...');
  });
}

writable.end('Last line\n');  // signal ว่าเขียนเสร็จ
writable.on('finish', () => console.log('File written'));

Transform Stream

Transform รับข้อมูลจาก Readable และส่งออกไปยัง Writable พร้อมแปลงข้อมูล

import { Transform } from 'node:stream';

// Transform: แปลง CSV เป็น JSON
class CSVToJSON extends Transform {
  constructor() {
    super({ objectMode: true });  // ส่ง object แทน Buffer/string
    this._headers = null;
    this._buffer = '';
  }

  _transform(chunk, _encoding, callback) {
    this._buffer += chunk.toString();
    const lines = this._buffer.split('\n');
    this._buffer = lines.pop() ?? '';  // เก็บบรรทัดที่ยังไม่ครบ

    for (const line of lines) {
      if (!line.trim()) continue;
      const values = line.split(',');

      if (!this._headers) {
        this._headers = values;
      } else {
        const obj = Object.fromEntries(
          this._headers.map((h, i) => [h.trim(), values[i]?.trim()])
        );
        this.push(obj);
      }
    }
    callback();
  }

  _flush(callback) {
    // จัดการ data ที่ค้างใน buffer
    if (this._buffer.trim() && this._headers) {
      const values = this._buffer.split(',');
      const obj = Object.fromEntries(
        this._headers.map((h, i) => [h.trim(), values[i]?.trim()])
      );
      this.push(obj);
    }
    callback();
  }
}

// ใช้งาน
const readable = fs.createReadStream('data.csv');
const transform = new CSVToJSON();

for await (const row of readable.pipe(transform)) {
  console.log(row);  // { name: 'Alice', age: '30' }
}

pipeline — The Right Way to Pipe

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

// ✓ pipeline cleanup ถ้ามี error ใน step ไหนก็ตาม
await pipeline(
  createReadStream('input.txt'),
  createGzip(),                   // compress
  createWriteStream('output.txt.gz')
);

// เทียบกับ pipe() ที่ไม่ cleanup:
// ❌ ถ้า error เกิดใน step กลาง → streams อื่นยัง open ค้างอยู่
readable.pipe(transform).pipe(writable);

Backpressure

import { Readable, Writable, pipeline } from 'node:stream';

// ❌ ไม่ handle backpressure — อาจ memory leak ถ้า writable ช้ากว่า readable
for await (const chunk of readable) {
  writable.write(chunk);  // ไม่รอถ้า buffer เต็ม
}

// ✓ ใช้ pipeline — handle backpressure อัตโนมัติ
await pipeline(readable, writable);

// ✓ หรือ handle เอง
async function copyWithBackpressure(src, dst) {
  for await (const chunk of src) {
    const ok = dst.write(chunk);
    if (!ok) await new Promise((r) => dst.once('drain', r));
  }
  dst.end();
}

Practical: Process Large CSV

import { createReadStream } from 'node:fs';
import { Transform, pipeline } from 'node:stream';
import { pipeline as pipelinePromise } from 'node:stream/promises';
import { createInterface } from 'node:readline';

async function processLargeCSV(filePath) {
  const fileStream = createReadStream(filePath);

  // readline ทำให้อ่านทีละบรรทัดได้ง่าย
  const rl = createInterface({
    input: fileStream,
    crlfDelay: Infinity,
  });

  let headers = null;
  let count = 0;
  let totalAge = 0;

  for await (const line of rl) {
    const values = line.split(',').map((v) => v.trim());
    if (!headers) {
      headers = values;
      continue;
    }
    const row = Object.fromEntries(headers.map((h, i) => [h, values[i]]));
    count++;
    totalAge += Number(row.age) || 0;
  }

  console.log(`Processed ${count} rows, avg age: ${(totalAge / count).toFixed(1)}`);
}

await processLargeCSV('users.csv');  // works on 10GB file, uses ~50MB RAM

Stream ใน HTTP

import http from 'node:http';
import fs from 'node:fs';

// ส่งไฟล์ใหญ่โดยไม่โหลดเข้า memory
http.createServer((req, res) => {
  const stat = fs.statSync('video.mp4');
  res.writeHead(200, {
    'Content-Type': 'video/mp4',
    'Content-Length': stat.size,
  });
  fs.createReadStream('video.mp4').pipe(res);
}).listen(3000);

// Range requests สำหรับ video seeking
http.createServer((req, res) => {
  const range = req.headers.range;
  const stat = fs.statSync('video.mp4');

  if (range) {
    const [start, end] = range.replace('bytes=', '').split('-').map(Number);
    const chunkEnd = end || Math.min(start + 10 * 1024 * 1024 - 1, stat.size - 1);
    res.writeHead(206, {
      'Content-Range': `bytes ${start}-${chunkEnd}/${stat.size}`,
      'Content-Length': chunkEnd - start + 1,
      'Content-Type': 'video/mp4',
    });
    fs.createReadStream('video.mp4', { start, end: chunkEnd }).pipe(res);
  }
}).listen(3000);