Category: guide
Node.js Streams — Readable, Writable, Transform, Pipeline
จัดการข้อมูลปริมาณมากด้วย Node.js Streams: pipe, pipeline, Transform streams, backpressure, และ async iteration
สารบัญ
ทำไมต้องใช้ Streams
// ❌ อ่านทั้งไฟล์เข้า memory — พัง ถ้าไฟล์ใหญ่กว่า RAM
const content = fs.readFileSync('huge-file.csv'); // 10GB file → OOM
// ✓ Stream: อ่านทีละ chunk — ใช้ RAM คงที่ไม่ว่าไฟล์จะใหญ่แค่ไหน
fs.createReadStream('huge-file.csv').pipe(processStream).pipe(outputStream);
Readable Stream
import fs from 'node:fs';
import { createReadStream } from 'node:fs';
// อ่านไฟล์เป็น stream
const readable = createReadStream('data.csv', {
encoding: 'utf8',
highWaterMark: 64 * 1024, // chunk size: 64KB (default: 64KB)
});
// Event-based
readable.on('data', (chunk) => {
console.log('Got chunk:', chunk.length, 'bytes');
});
readable.on('end', () => console.log('Done'));
readable.on('error', (err) => console.error(err));
// Async iteration (ง่ายกว่า events)
for await (const chunk of readable) {
process.stdout.write(chunk);
}
สร้าง Readable จ้าก Generator
import { Readable } from 'node:stream';
async function* generateData() {
for (let i = 0; i < 1000; i++) {
yield `line ${i}\n`;
// simulate async work
await new Promise((r) => setTimeout(r, 0));
}
}
const stream = Readable.from(generateData());
for await (const line of stream) {
process.stdout.write(line);
}
Writable Stream
import { createWriteStream } from 'node:fs';
const writable = createWriteStream('output.txt', { encoding: 'utf8' });
// เขียน — returns false ถ้า buffer เต็ม (backpressure signal)
const canContinue = writable.write('Hello, World!\n');
// ถ้า write() return false ต้องรอ 'drain' event ก่อนเขียนต่อ
if (!canContinue) {
writable.once('drain', () => {
writable.write('More data...');
});
}
writable.end('Last line\n'); // signal ว่าเขียนเสร็จ
writable.on('finish', () => console.log('File written'));
Transform Stream
Transform รับข้อมูลจาก Readable และส่งออกไปยัง Writable พร้อมแปลงข้อมูล
import { Transform } from 'node:stream';
// Transform: แปลง CSV เป็น JSON
class CSVToJSON extends Transform {
constructor() {
super({ objectMode: true }); // ส่ง object แทน Buffer/string
this._headers = null;
this._buffer = '';
}
_transform(chunk, _encoding, callback) {
this._buffer += chunk.toString();
const lines = this._buffer.split('\n');
this._buffer = lines.pop() ?? ''; // เก็บบรรทัดที่ยังไม่ครบ
for (const line of lines) {
if (!line.trim()) continue;
const values = line.split(',');
if (!this._headers) {
this._headers = values;
} else {
const obj = Object.fromEntries(
this._headers.map((h, i) => [h.trim(), values[i]?.trim()])
);
this.push(obj);
}
}
callback();
}
_flush(callback) {
// จัดการ data ที่ค้างใน buffer
if (this._buffer.trim() && this._headers) {
const values = this._buffer.split(',');
const obj = Object.fromEntries(
this._headers.map((h, i) => [h.trim(), values[i]?.trim()])
);
this.push(obj);
}
callback();
}
}
// ใช้งาน
const readable = fs.createReadStream('data.csv');
const transform = new CSVToJSON();
for await (const row of readable.pipe(transform)) {
console.log(row); // { name: 'Alice', age: '30' }
}
pipeline — The Right Way to Pipe
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
// ✓ pipeline cleanup ถ้ามี error ใน step ไหนก็ตาม
await pipeline(
createReadStream('input.txt'),
createGzip(), // compress
createWriteStream('output.txt.gz')
);
// เทียบกับ pipe() ที่ไม่ cleanup:
// ❌ ถ้า error เกิดใน step กลาง → streams อื่นยัง open ค้างอยู่
readable.pipe(transform).pipe(writable);
Backpressure
import { Readable, Writable, pipeline } from 'node:stream';
// ❌ ไม่ handle backpressure — อาจ memory leak ถ้า writable ช้ากว่า readable
for await (const chunk of readable) {
writable.write(chunk); // ไม่รอถ้า buffer เต็ม
}
// ✓ ใช้ pipeline — handle backpressure อัตโนมัติ
await pipeline(readable, writable);
// ✓ หรือ handle เอง
async function copyWithBackpressure(src, dst) {
for await (const chunk of src) {
const ok = dst.write(chunk);
if (!ok) await new Promise((r) => dst.once('drain', r));
}
dst.end();
}
Practical: Process Large CSV
import { createReadStream } from 'node:fs';
import { Transform, pipeline } from 'node:stream';
import { pipeline as pipelinePromise } from 'node:stream/promises';
import { createInterface } from 'node:readline';
async function processLargeCSV(filePath) {
const fileStream = createReadStream(filePath);
// readline ทำให้อ่านทีละบรรทัดได้ง่าย
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity,
});
let headers = null;
let count = 0;
let totalAge = 0;
for await (const line of rl) {
const values = line.split(',').map((v) => v.trim());
if (!headers) {
headers = values;
continue;
}
const row = Object.fromEntries(headers.map((h, i) => [h, values[i]]));
count++;
totalAge += Number(row.age) || 0;
}
console.log(`Processed ${count} rows, avg age: ${(totalAge / count).toFixed(1)}`);
}
await processLargeCSV('users.csv'); // works on 10GB file, uses ~50MB RAM
Stream ใน HTTP
import http from 'node:http';
import fs from 'node:fs';
// ส่งไฟล์ใหญ่โดยไม่โหลดเข้า memory
http.createServer((req, res) => {
const stat = fs.statSync('video.mp4');
res.writeHead(200, {
'Content-Type': 'video/mp4',
'Content-Length': stat.size,
});
fs.createReadStream('video.mp4').pipe(res);
}).listen(3000);
// Range requests สำหรับ video seeking
http.createServer((req, res) => {
const range = req.headers.range;
const stat = fs.statSync('video.mp4');
if (range) {
const [start, end] = range.replace('bytes=', '').split('-').map(Number);
const chunkEnd = end || Math.min(start + 10 * 1024 * 1024 - 1, stat.size - 1);
res.writeHead(206, {
'Content-Range': `bytes ${start}-${chunkEnd}/${stat.size}`,
'Content-Length': chunkEnd - start + 1,
'Content-Type': 'video/mp4',
});
fs.createReadStream('video.mp4', { start, end: chunkEnd }).pipe(res);
}
}).listen(3000);