Skip to content

Pushing S3 Events to RabbitMQ

Cloud storage (AWS S3) often acts as a landing zone for raw data. You may want to notify other services via a message broker (RabbitMQ/Kafka) as soon as new data is processed.

The Strategy

We combine S3Source with ParquetReader (or any other reader) and use CallbackWriter to push each record to the queue.

Implementation

typescript
import { S3Source, ParquetReader, CallbackWriter, Job } from '@pujansrt/data-genie';
import { S3Client } from '@aws-sdk/client-s3';
import amqp from 'amqplib';

const s3Client = new S3Client({ region: 'us-east-1' });

async function run() {
  const conn = await amqp.connect('amqp://localhost');
  const channel = await conn.createChannel();
  const queue = 'user-imports';

  await channel.assertQueue(queue);

  const source = new S3Source(s3Client, 'my-bucket', 'exports/users.parquet');
  const reader = new ParquetReader(source);

  const writer = new CallbackWriter(async (record) => {
    channel.sendToQueue(queue, Buffer.from(JSON.stringify(record)));
  });

  await Job.run(reader, writer);
  
  await channel.close();
  await conn.close();
}

run().catch(console.error);

Batching Alternative

If you prefer to send messages in batches for better throughput, use BatchCallbackWriter:

typescript
const writer = new BatchCallbackWriter(50, async (batch) => {
  // batch is an array of 50 records
  batch.forEach(record => {
    channel.sendToQueue(queue, Buffer.from(JSON.stringify(record)));
  });
});

Released under the MIT License.