stream_large_file
Stream large files progressively without loading into memory.
Overview
The stream_large_file tool provides an async generator interface for processing large files line-by-line or chunk-by-chunk without memory overhead. This is ideal for real-time processing, pipelines, and handling files that exceed available memory.
Usage
{
"tool": "stream_large_file",
"arguments": {
"filePath": "/var/log/massive.log",
"chunkSize": 100
}
}Parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
filePath | string | Yes | - | Absolute or relative path to the file |
chunkSize | number | No | 100 | Number of lines per yielded chunk |
startLine | number | No | 1 | Line number to start streaming from |
endLine | number | No | EOF | Line number to stop streaming at |
Response Format
The tool returns an async generator that yields chunks:
AsyncGenerator<{
lines: string[]; // Array of lines in chunk
chunkIndex: number; // Current chunk number (0-based)
startLine: number; // First line number in chunk
endLine: number; // Last line number in chunk
hasMore: boolean; // Whether more chunks exist
}>Examples
Basic Streaming
Stream entire file in chunks of 100 lines:
const stream = stream_large_file({
filePath: "/var/log/app.log",
chunkSize: 100
});
for await (const chunk of stream) {
console.log(`Processing lines ${chunk.startLine}-${chunk.endLine}`);
processLines(chunk.lines);
}Custom Chunk Size
Stream with larger chunks for better throughput:
const stream = stream_large_file({
filePath: "/data/large.csv",
chunkSize: 1000
});
for await (const chunk of stream) {
await processCsvChunk(chunk.lines);
}Partial Streaming
Stream specific line range:
const stream = stream_large_file({
filePath: "/var/log/system.log",
startLine: 10000,
endLine: 20000,
chunkSize: 500
});
for await (const chunk of stream) {
console.log(`Chunk ${chunk.chunkIndex}: ${chunk.lines.length} lines`);
}Progress Tracking
Monitor streaming progress:
const stream = stream_large_file({
filePath: "/data/huge.log",
chunkSize: 100
});
let processedLines = 0;
for await (const chunk of stream) {
processedLines += chunk.lines.length;
const progress = (processedLines / totalLines) * 100;
console.log(`Progress: ${progress.toFixed(2)}%`);
await processChunk(chunk.lines);
}Common Use Cases
1. Log Processing Pipeline
Process logs in real-time:
async function processLogStream(logFile: string) {
const stream = stream_large_file({
filePath: logFile,
chunkSize: 500
});
const errors: string[] = [];
for await (const chunk of stream) {
for (const line of chunk.lines) {
if (line.includes("ERROR")) {
errors.push(line);
}
}
}
return errors;
}2. Data Transformation
Transform CSV data row-by-row:
async function transformCsv(inputFile: string, outputFile: string) {
const stream = stream_large_file({
filePath: inputFile,
chunkSize: 1000
});
const writer = fs.createWriteStream(outputFile);
for await (const chunk of stream) {
const transformed = chunk.lines
.map(line => transformRow(line))
.join("\n");
writer.write(transformed + "\n");
}
writer.end();
}3. Statistical Analysis
Calculate statistics without loading full file:
async function calculateStats(dataFile: string) {
const stream = stream_large_file({
filePath: dataFile,
chunkSize: 1000
});
let sum = 0;
let count = 0;
let min = Infinity;
let max = -Infinity;
for await (const chunk of stream) {
for (const line of chunk.lines) {
const value = parseFloat(line);
sum += value;
count++;
min = Math.min(min, value);
max = Math.max(max, value);
}
}
return {
average: sum / count,
min,
max,
count
};
}4. Search and Extract
Find and extract matching lines:
async function extractMatches(logFile: string, pattern: RegExp) {
const stream = stream_large_file({
filePath: logFile,
chunkSize: 500
});
const matches: Array<{ lineNumber: number; content: string }> = [];
for await (const chunk of stream) {
chunk.lines.forEach((line, index) => {
if (pattern.test(line)) {
matches.push({
lineNumber: chunk.startLine + index,
content: line
});
}
});
}
return matches;
}5. Real-time Monitoring
Monitor log file as it grows:
async function monitorLog(logFile: string) {
const stream = stream_large_file({
filePath: logFile,
chunkSize: 10
});
for await (const chunk of stream) {
for (const line of chunk.lines) {
if (line.includes("ERROR") || line.includes("FATAL")) {
console.error("🚨 Alert:", line);
sendAlert(line);
}
}
// Brief pause before next chunk
await sleep(100);
}
}Performance
| File Size | Chunk Size | Memory Usage | Throughput | Notes |
|---|---|---|---|---|
| < 10MB | 100 | < 1MB | 10K lines/s | Fast |
| 10-100MB | 500 | < 5MB | 50K lines/s | Optimal |
| 100MB-1GB | 1000 | < 10MB | 100K lines/s | High throughput |
| > 1GB | 5000 | < 50MB | 200K lines/s | Maximum efficiency |
Memory Characteristics
- Constant Memory: O(chunkSize) - independent of file size
- No Full Load: Never loads entire file into memory
- Streaming: True line-by-line streaming
- Backpressure: Automatic backpressure handling
Best Practices
1. Choose Optimal Chunk Size
Balance memory and throughput:
// Small files or real-time: small chunks
const realtimeStream = stream_large_file({
filePath: "current.log",
chunkSize: 10 // Low latency
});
// Large files: larger chunks
const batchStream = stream_large_file({
filePath: "archive.log",
chunkSize: 5000 // High throughput
});2. Handle Errors Gracefully
async function safeStream(filePath: string) {
try {
const stream = stream_large_file({ filePath, chunkSize: 100 });
for await (const chunk of stream) {
try {
await processChunk(chunk);
} catch (error) {
console.error(`Error in chunk ${chunk.chunkIndex}:`, error);
// Continue processing other chunks
}
}
} catch (error) {
console.error("Stream error:", error);
}
}3. Implement Backpressure
Control processing rate:
async function streamWithBackpressure(filePath: string) {
const stream = stream_large_file({ filePath, chunkSize: 100 });
const maxConcurrent = 5;
const queue: Promise<void>[] = [];
for await (const chunk of stream) {
const task = processChunk(chunk);
queue.push(task);
// Wait if queue is full
if (queue.length >= maxConcurrent) {
await Promise.race(queue);
queue.splice(queue.findIndex(p => p === task), 1);
}
}
// Wait for remaining tasks
await Promise.all(queue);
}4. Progress Reporting
Show user-friendly progress:
async function streamWithProgress(filePath: string) {
// Get total lines first
const structure = await get_file_structure({ filePath });
const totalLines = structure.totalLines;
const stream = stream_large_file({ filePath, chunkSize: 1000 });
let processedLines = 0;
for await (const chunk of stream) {
processedLines += chunk.lines.length;
const percent = ((processedLines / totalLines) * 100).toFixed(1);
console.log(`Progress: ${percent}% (${processedLines}/${totalLines})`);
await processChunk(chunk);
}
}5. Parallel Processing
Process chunks in parallel:
async function parallelStream(filePath: string) {
const stream = stream_large_file({ filePath, chunkSize: 100 });
const workers = 4;
const chunks: any[] = [];
for await (const chunk of stream) {
chunks.push(chunk);
// Process in batches
if (chunks.length >= workers) {
await Promise.all(chunks.map(c => processChunk(c)));
chunks.length = 0;
}
}
// Process remaining
if (chunks.length > 0) {
await Promise.all(chunks.map(c => processChunk(c)));
}
}Error Handling
File Not Found
try {
const stream = stream_large_file({ filePath: "missing.log" });
for await (const chunk of stream) {
// ...
}
} catch (error) {
console.error("File not found:", error);
}Permission Denied
try {
const stream = stream_large_file({ filePath: "/root/protected.log" });
for await (const chunk of stream) {
// ...
}
} catch (error) {
console.error("Permission denied:", error);
}Invalid Range
const stream = stream_large_file({
filePath: "file.log",
startLine: 10000,
endLine: 5000 // Error: endLine < startLine
});Comparison with Other Methods
| Method | Memory | Speed | Use Case |
|---|---|---|---|
stream_large_file | O(chunkSize) | Fast | Real-time, large files |
read_large_file_chunk | O(chunkSize) | Faster | Random access |
fs.readFileSync | O(fileSize) | Fastest | Small files only |
readline | O(1) | Slow | Line-by-line |
Advanced Patterns
1. Transform Stream
async function* transformStream(filePath: string, transformer: (line: string) => string) {
const stream = stream_large_file({ filePath, chunkSize: 100 });
for await (const chunk of stream) {
const transformed = chunk.lines.map(transformer);
yield {
...chunk,
lines: transformed
};
}
}
// Usage
const transformed = transformStream("/data/input.csv", line =>
line.toUpperCase()
);
for await (const chunk of transformed) {
console.log(chunk.lines);
}2. Filter Stream
async function* filterStream(filePath: string, predicate: (line: string) => boolean) {
const stream = stream_large_file({ filePath, chunkSize: 100 });
for await (const chunk of stream) {
const filtered = chunk.lines.filter(predicate);
if (filtered.length > 0) {
yield {
...chunk,
lines: filtered
};
}
}
}
// Usage
const errors = filterStream("/var/log/app.log", line =>
line.includes("ERROR")
);
for await (const chunk of errors) {
console.log("Error chunk:", chunk.lines);
}3. Reduce Stream
async function reduceStream<T>(
filePath: string,
reducer: (acc: T, line: string) => T,
initialValue: T
): Promise<T> {
const stream = stream_large_file({ filePath, chunkSize: 1000 });
let accumulator = initialValue;
for await (const chunk of stream) {
for (const line of chunk.lines) {
accumulator = reducer(accumulator, line);
}
}
return accumulator;
}
// Usage: Count lines with "ERROR"
const errorCount = await reduceStream(
"/var/log/app.log",
(count, line) => count + (line.includes("ERROR") ? 1 : 0),
0
);See Also
- Tools Overview - All available tools
- read_large_file_chunk - Chunk-based reading
- search_in_large_file - Search functionality
- Performance Guide - Optimization tips
- Best Practices - Usage recommendations
- CSV Processing Example - Real-world streaming