CSV Processing Examples
Efficient processing of large CSV files with Large File MCP Server.
Overview
CSV files can grow to millions of rows in data processing pipelines. This guide demonstrates memory-efficient patterns for CSV analysis, validation, and transformation.
Basic CSV Operations
Read CSV with Headers
Parse CSV with automatic header detection:
typescript
async function readCsvWithHeaders(csvFile: string) {
// Read first chunk to get headers
const firstChunk = await read_large_file_chunk({
filePath: csvFile,
chunkIndex: 0
});
const lines = firstChunk.content.split("\n");
const headers = lines[0].split(",").map(h => h.trim());
console.log("CSV Headers:", headers);
console.log("Total rows:", firstChunk.totalLines - 1); // Exclude header
// Read first few data rows
const dataRows = lines.slice(1, 6);
console.log("\nFirst 5 rows:");
dataRows.forEach((row, idx) => {
const values = row.split(",");
console.log(`Row ${idx + 1}:`);
headers.forEach((header, i) => {
console.log(` ${header}: ${values[i]}`);
});
console.log();
});
}
await readCsvWithHeaders("/data/sales.csv");Stream CSV Processing
Process large CSV files row-by-row:
typescript
async function processCsv(csvFile: string) {
const stream = stream_large_file({
filePath: csvFile,
chunkSize: 1000
});
let headers: string[] = [];
let isFirstChunk = true;
let totalRows = 0;
for await (const chunk of stream) {
const lines = chunk.lines.filter(line => line.trim());
// Extract headers from first chunk
if (isFirstChunk) {
headers = lines[0].split(",").map(h => h.trim());
lines.shift(); // Remove header row
isFirstChunk = false;
}
// Process data rows
for (const line of lines) {
const values = line.split(",");
const row = Object.fromEntries(
headers.map((header, i) => [header, values[i]])
);
await processRow(row);
totalRows++;
}
console.log(`Processed ${totalRows} rows...`);
}
console.log(`\nComplete! Total rows: ${totalRows}`);
}
async function processRow(row: Record<string, string>) {
// Your processing logic here
// e.g., validation, transformation, database insert
}
await processCsv("/data/transactions.csv");Data Validation
Validate CSV Structure
Check for structural issues:
typescript
async function validateCsvStructure(csvFile: string) {
const stream = stream_large_file({
filePath: csvFile,
chunkSize: 1000
});
let headers: string[] = [];
let isFirstChunk = true;
let errors: Array<{ row: number; issue: string }> = [];
let currentRow = 0;
for await (const chunk of stream) {
const lines = chunk.lines.filter(line => line.trim());
if (isFirstChunk) {
headers = lines[0].split(",");
lines.shift();
isFirstChunk = false;
}
for (const line of lines) {
currentRow++;
const values = line.split(",");
// Check column count
if (values.length !== headers.length) {
errors.push({
row: currentRow,
issue: `Column count mismatch: expected ${headers.length}, got ${values.length}`
});
}
// Check for empty values
values.forEach((value, idx) => {
if (!value.trim()) {
errors.push({
row: currentRow,
issue: `Empty value in column "${headers[idx]}"`
});
}
});
// Limit error collection
if (errors.length >= 100) break;
}
if (errors.length >= 100) break;
}
// Report results
if (errors.length === 0) {
console.log("✓ CSV structure is valid");
} else {
console.log(`✗ Found ${errors.length} issues:\n`);
errors.slice(0, 20).forEach(error => {
console.log(`Row ${error.row}: ${error.issue}`);
});
if (errors.length > 20) {
console.log(`\n... and ${errors.length - 20} more issues`);
}
}
}
await validateCsvStructure("/data/import.csv");Data Type Validation
Validate data types in columns:
typescript
interface ColumnSchema {
name: string;
type: "string" | "number" | "date" | "email";
required?: boolean;
}
async function validateDataTypes(
csvFile: string,
schema: ColumnSchema[]
) {
const stream = stream_large_file({
filePath: csvFile,
chunkSize: 1000
});
let headers: string[] = [];
let isFirstChunk = true;
let errors: string[] = [];
let currentRow = 0;
const validators = {
number: (val: string) => !isNaN(parseFloat(val)),
date: (val: string) => !isNaN(Date.parse(val)),
email: (val: string) => /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(val),
string: () => true
};
for await (const chunk of stream) {
const lines = chunk.lines.filter(line => line.trim());
if (isFirstChunk) {
headers = lines[0].split(",").map(h => h.trim());
lines.shift();
isFirstChunk = false;
}
for (const line of lines) {
currentRow++;
const values = line.split(",");
schema.forEach((column, idx) => {
const value = values[idx]?.trim();
// Check required
if (column.required && !value) {
errors.push(
`Row ${currentRow}: Missing required value in "${column.name}"`
);
return;
}
// Check type
if (value && !validators[column.type](value)) {
errors.push(
`Row ${currentRow}: Invalid ${column.type} in "${column.name}": "${value}"`
);
}
});
if (errors.length >= 100) break;
}
if (errors.length >= 100) break;
}
// Report
if (errors.length === 0) {
console.log("✓ All data types are valid");
} else {
console.log(`✗ Found ${errors.length} validation errors:\n`);
errors.slice(0, 20).forEach(error => console.log(error));
if (errors.length > 20) {
console.log(`\n... and ${errors.length - 20} more errors`);
}
}
}
// Usage
await validateDataTypes("/data/users.csv", [
{ name: "id", type: "number", required: true },
{ name: "email", type: "email", required: true },
{ name: "created_at", type: "date", required: true },
{ name: "age", type: "number" }
]);Data Analysis
Calculate Statistics
Compute statistical summary:
typescript
async function calculateStats(csvFile: string, columnName: string) {
const stream = stream_large_file({
filePath: csvFile,
chunkSize: 1000
});
let headers: string[] = [];
let isFirstChunk = true;
let columnIndex = -1;
const values: number[] = [];
for await (const chunk of stream) {
const lines = chunk.lines.filter(line => line.trim());
if (isFirstChunk) {
headers = lines[0].split(",").map(h => h.trim());
columnIndex = headers.indexOf(columnName);
if (columnIndex === -1) {
throw new Error(`Column "${columnName}" not found`);
}
lines.shift();
isFirstChunk = false;
}
for (const line of lines) {
const cols = line.split(",");
const value = parseFloat(cols[columnIndex]);
if (!isNaN(value)) {
values.push(value);
}
}
}
// Calculate statistics
values.sort((a, b) => a - b);
const sum = values.reduce((a, b) => a + b, 0);
const mean = sum / values.length;
const median = values[Math.floor(values.length / 2)];
const min = values[0];
const max = values[values.length - 1];
// Standard deviation
const squaredDiffs = values.map(v => Math.pow(v - mean, 2));
const variance = squaredDiffs.reduce((a, b) => a + b, 0) / values.length;
const stdDev = Math.sqrt(variance);
console.log(`=== Statistics for "${columnName}" ===\n`);
console.log(`Count: ${values.length}`);
console.log(`Mean: ${mean.toFixed(2)}`);
console.log(`Median: ${median.toFixed(2)}`);
console.log(`Std Dev: ${stdDev.toFixed(2)}`);
console.log(`Min: ${min.toFixed(2)}`);
console.log(`Max: ${max.toFixed(2)}`);
console.log(`Range: ${(max - min).toFixed(2)}`);
}
await calculateStats("/data/sales.csv", "amount");Group By and Aggregate
Calculate aggregates by category:
typescript
async function groupByAndSum(
csvFile: string,
groupColumn: string,
sumColumn: string
) {
const stream = stream_large_file({
filePath: csvFile,
chunkSize: 1000
});
let headers: string[] = [];
let isFirstChunk = true;
let groupIdx = -1;
let sumIdx = -1;
const groups = new Map<string, number>();
for await (const chunk of stream) {
const lines = chunk.lines.filter(line => line.trim());
if (isFirstChunk) {
headers = lines[0].split(",").map(h => h.trim());
groupIdx = headers.indexOf(groupColumn);
sumIdx = headers.indexOf(sumColumn);
if (groupIdx === -1 || sumIdx === -1) {
throw new Error("Column not found");
}
lines.shift();
isFirstChunk = false;
}
for (const line of lines) {
const cols = line.split(",");
const groupKey = cols[groupIdx].trim();
const value = parseFloat(cols[sumIdx]);
if (!isNaN(value)) {
groups.set(groupKey, (groups.get(groupKey) || 0) + value);
}
}
}
// Display results
console.log(`=== ${sumColumn} by ${groupColumn} ===\n`);
Array.from(groups.entries())
.sort(([, a], [, b]) => b - a)
.forEach(([key, sum]) => {
console.log(`${key}: ${sum.toFixed(2)}`);
});
}
await groupByAndSum("/data/sales.csv", "category", "amount");Data Transformation
Filter and Export
Filter rows and write to new file:
typescript
async function filterCsv(
inputFile: string,
outputFile: string,
predicate: (row: Record<string, string>) => boolean
) {
const stream = stream_large_file({
filePath: inputFile,
chunkSize: 1000
});
const writer = fs.createWriteStream(outputFile);
let headers: string[] = [];
let isFirstChunk = true;
let filteredCount = 0;
for await (const chunk of stream) {
const lines = chunk.lines.filter(line => line.trim());
if (isFirstChunk) {
headers = lines[0].split(",").map(h => h.trim());
writer.write(lines[0] + "\n"); // Write headers
lines.shift();
isFirstChunk = false;
}
for (const line of lines) {
const values = line.split(",");
const row = Object.fromEntries(
headers.map((h, i) => [h, values[i]])
);
if (predicate(row)) {
writer.write(line + "\n");
filteredCount++;
}
}
}
writer.end();
console.log(`Filtered ${filteredCount} rows to ${outputFile}`);
}
// Usage: Export only completed transactions
await filterCsv(
"/data/transactions.csv",
"/data/completed.csv",
row => row.status === "completed"
);Transform Columns
Apply transformations to columns:
typescript
async function transformCsv(
inputFile: string,
outputFile: string,
transforms: Record<string, (value: string) => string>
) {
const stream = stream_large_file({
filePath: inputFile,
chunkSize: 1000
});
const writer = fs.createWriteStream(outputFile);
let headers: string[] = [];
let isFirstChunk = true;
for await (const chunk of stream) {
const lines = chunk.lines.filter(line => line.trim());
if (isFirstChunk) {
headers = lines[0].split(",").map(h => h.trim());
writer.write(lines[0] + "\n");
lines.shift();
isFirstChunk = false;
}
for (const line of lines) {
const values = line.split(",");
const transformed = values.map((value, idx) => {
const header = headers[idx];
if (transforms[header]) {
return transforms[header](value);
}
return value;
});
writer.write(transformed.join(",") + "\n");
}
}
writer.end();
}
// Usage: Normalize data
await transformCsv(
"/data/users.csv",
"/data/users_normalized.csv",
{
email: email => email.toLowerCase(),
name: name => name.trim().toUpperCase(),
created_at: date => new Date(date).toISOString()
}
);Data Quality
Find Duplicates
Detect duplicate rows:
typescript
async function findDuplicates(
csvFile: string,
keyColumns: string[]
) {
const stream = stream_large_file({
filePath: csvFile,
chunkSize: 1000
});
let headers: string[] = [];
let isFirstChunk = true;
const seen = new Map<string, number[]>();
let currentRow = 0;
for await (const chunk of stream) {
const lines = chunk.lines.filter(line => line.trim());
if (isFirstChunk) {
headers = lines[0].split(",").map(h => h.trim());
lines.shift();
isFirstChunk = false;
}
for (const line of lines) {
currentRow++;
const values = line.split(",");
// Create key from specified columns
const keyIndices = keyColumns.map(col => headers.indexOf(col));
const key = keyIndices.map(idx => values[idx]).join("|");
if (!seen.has(key)) {
seen.set(key, []);
}
seen.get(key)!.push(currentRow);
}
}
// Find duplicates
const duplicates = Array.from(seen.entries())
.filter(([, rows]) => rows.length > 1)
.map(([key, rows]) => ({ key, rows }));
console.log(`=== Duplicate Analysis ===\n`);
console.log(`Total rows: ${currentRow}`);
console.log(`Unique: ${seen.size}`);
console.log(`Duplicates: ${duplicates.length}\n`);
if (duplicates.length > 0) {
console.log("Top duplicates:");
duplicates
.sort((a, b) => b.rows.length - a.rows.length)
.slice(0, 10)
.forEach(({ key, rows }) => {
console.log(`${key}: ${rows.length} occurrences at rows ${rows.join(", ")}`);
});
}
}
await findDuplicates("/data/users.csv", ["email"]);Check Data Completeness
Analyze missing values:
typescript
async function analyzeCompleteness(csvFile: string) {
const stream = stream_large_file({
filePath: csvFile,
chunkSize: 1000
});
let headers: string[] = [];
let isFirstChunk = true;
const missing = new Map<string, number>();
let totalRows = 0;
for await (const chunk of stream) {
const lines = chunk.lines.filter(line => line.trim());
if (isFirstChunk) {
headers = lines[0].split(",").map(h => h.trim());
headers.forEach(h => missing.set(h, 0));
lines.shift();
isFirstChunk = false;
}
for (const line of lines) {
totalRows++;
const values = line.split(",");
values.forEach((value, idx) => {
if (!value.trim()) {
const header = headers[idx];
missing.set(header, (missing.get(header) || 0) + 1);
}
});
}
}
// Report
console.log(`=== Data Completeness ===\n`);
console.log(`Total rows: ${totalRows}\n`);
Array.from(missing.entries())
.sort(([, a], [, b]) => b - a)
.forEach(([column, count]) => {
const percent = ((count / totalRows) * 100).toFixed(1);
console.log(`${column}: ${count} missing (${percent}%)`);
if (count / totalRows > 0.1) {
console.log(` ⚠ High missing rate`);
}
});
}
await analyzeCompleteness("/data/dataset.csv");Best Practices
1. Always Use Streaming for Large Files
typescript
// Good: Streaming approach
async function processLargeCsv(file: string) {
const stream = stream_large_file({
filePath: file,
chunkSize: 1000
});
for await (const chunk of stream) {
await processChunk(chunk.lines);
}
}
// Bad: Loading entire file
const allData = fs.readFileSync(file, "utf-8").split("\n");2. Handle Headers Correctly
typescript
let isFirstChunk = true;
let headers: string[] = [];
for await (const chunk of stream) {
let lines = chunk.lines;
if (isFirstChunk) {
headers = lines[0].split(",");
lines = lines.slice(1); // Remove header
isFirstChunk = false;
}
// Process data rows
}3. Validate Before Processing
typescript
// Validate structure first
await validateCsvStructure(file);
// Then process
await processCsv(file);See Also
- API Reference - All available tools
- stream_large_file - Streaming documentation
- Log Analysis - Log processing examples
- Performance Guide - Optimization tips
- Best Practices - Usage recommendations