Creating a New JavaScript Source Plugin from Scratch

October 1, 2023

Michal Brutvan
Name
Michal Brutvan
Twitter
pilvikala

In this blog post, I will walk you through the steps to create a new source plugin using CloudQuery JavaScript SDK. The purpose is to give you a basic overview of how to create tables and columns and how to sync the actual data.

What You Need to Know

Make sure you go over the core concepts of CloudQuery and the JavaScript SDK.

I will be using TypeScript for IDE autocompletion and code type safety.

Prerequisites

To build and release the plugin for public use, you will need Docker (opens in a new tab) to be installed locally.

The Plan

At the high-level, this is what I would like to achieve:

  • The plugin should support reading a set of CSV files from a directory.
  • Each file will be synced as a single table in the destination.
  • The column names will be read from the first row.
  • The plugin should recognize basic types - I will start with numbers and everything else will be a string.

Here's what I imagine the configuration will look like:

kind: source
spec:
  name: "text-file"
  registry: "grpc"  # for local testing only, this will change once the plugin is published
  path: "localhost:7777" # same as above
  version: "v1.0.0"
  tables:
    ["*"]
  destinations:
    - "sqlite"
  spec:
    path: "test_data/" # specify folder or a single file
    csvDelimiter: ";"

---
kind: destination
spec:
  name: sqlite
  path: cloudquery/sqlite
  version: "v2.4.9"
  spec:
    connection_string: ./db.sql

Getting Started

We will not start completely from scratch but rather use a template that needs filling in the blanks. There is a javascript-plugin-template (opens in a new tab) starter repository that you can clone.

To install all dependencies and test whether it is running, run the following commands:

npm install 
npm run dev # this will run the plugin in dev mode as a server listening on localhost:7777
cloudquery sync sync.yml #run this in a new terminal tab

This will create a new SQLite database file named db.sql with table Names and two records. You can open this file using DB Browser for SQLite (opens in a new tab) or any other SQLite client.

What's inside

The main content is in the src folder:

  • main.ts - The main wrapper for the plugin. It's responsibility is to start the plugin the CLI will communicate with when running the sync.
  • plugin.ts - The body of the plugin. The main responsibility of this module is to return an initialized instance of the plugin to serve.
  • spec.ts - This module is responsible for handling and validating plugin configuration.
  • tables.ts - This is the module where the main work happens and the one we'll be working with the most.
  • tables.test.ts - Sample unit tests for the module above.

How it all works

When CloudQuery runs a sync, it will call the plugin clients' sync function and expect the plugin to load the tables. We will be implementing the functions to return the tables back to the plugin client.

There are other functions that the The SDK provides a basic implementation for the plugins so we will not need to implement anything else and we will rely on the implementation returned by the newPlugin function.

The code below (see plugin.ts) takes care of creating a newClient function that will be called by the SDK and it will connect the plugin configuration passed in spec with the getTables where the actual implementation will happen.

const newClient: NewClientFunction = async (
    logger,
    spec,
    { noConnection },
  ) => {
    pluginClient.spec = parseSpec(spec);
    pluginClient.client = { id: () => "cq-js-sample" };
    if (noConnection) {
      pluginClient.allTables = [];
      return pluginClient;
    }
    pluginClient.allTables = await getTables();

    return pluginClient;
  };

  pluginClient.plugin = newPlugin("cq-js-sample", version, newClient);

The getTables is a function exported from the tables module. It is an asynchronous function returning an array of tables that will be passed to the destination plugin. In this sample plugin, it only returns one table.

// tables.ts

export const getTables = async (
): Promise<Table[]> => {
 
  const table = await getTable();
  return [table];
};

The table returned is specified by a definition of columns and a tableResolver, which takes care of writing to a stream provided by the SDK.


  const columnNames = ["First Name", "Last Name"];
  const tableRecords = [{"First Name": "Jack", "Last Name": "Bauer"}, {"First Name": "Thomas", "Last Name": "Kirkman"}]
  
  // ...

  const tableResolver: TableResolver = (clientMeta, parent, stream) => {
    for (const r of tableRecords) stream.write(r)
    return Promise.resolve();
  };
  return createTable({ name: "Names", columns: columnDefinitions, resolver: tableResolver });

We will need to change this to get the tableRecords from a CSV file.

Let's Code!

Plugin configuration

We will start with the plugin configuration. To parse a set of CSV files, we'll need the following:

  • a path to where the files are; mandatory
  • a CSV delimiter; defaults to ","

This is what the spec will look like in the YAML for CloudQuery CLI:

  spec:
    path: "test_data/"
    csvDelimiter: ";"

We will define the spec in the spec.ts file:

const spec = {
  type: "object",
  properties: {
    concurrency: { type: "integer" },
    path: { type: "string" },
    csvDelimiter: {type: "string" },
  },
  required: ["path"],
};

const ajv = new Ajv.default();
const validate = ajv.compile(spec);

export type Spec = {
  concurrency: number;
  path: string;
  csvDelimiter: string;
};

export const parseSpec = (spec: string): Spec => {
  const parsed = JSON.parse(spec) as Partial<Spec>;
  const valid = validate(parsed);
  if (!valid) {
    throw new Error(`Invalid spec: ${JSON.stringify(validate.errors)}`);
  }
  const {
    concurrency = 10_000,
    path = "",
    csvDelimiter = ",",
  } = camelcaseKeys(parsed);
! return { concurrency, path, csvDelimiter };
};

This will now get passed to our getTables function in tables.ts, so we can add it as an argument there. We'll also add a logger and pass it from the plugin initialization.

export const getTables = async (
  logger: Logger,
  spec: Spec
): Promise<Table[]> => {
  const table = await getTable();
  return [table];
};

Reading the CSV files

This should be fairly straightforward. We will use the fs module to get the list of files in the provided path and then load the files using csv-parse. We will add a function to get the CSV file paths and another one to parse a file.

Note: Install the csv-parse package using npm install csv-parse.

//update imports:
import type { Logger } from "winston";
import fs from "node:fs/promises";
import Path from "node:path";

// ...

const getCsvFiles = async (logger: Logger, path: string): Promise<string[]> => {
  const stats = await fs.stat(path);
  if (stats.isDirectory()) {
    const files = await fs.readdir(path, { withFileTypes: true });
    return files.filter((f) => f.isFile()).map((f) => Path.join(path, f.name));
  }
  logger.error("Target path is not a directory.");
  return [];
};

const parseCsvFile = async (path: string, csvDelimiter: string): Promise<string[][]> => {
  const content = await fs.readFile(path);
  return new Promise<string[][]>((resolve, reject) => {
    parse(content, { delimiter: csvDelimiter }, (error, records) => {
      if (error) {
        reject(error);
        return;
      }
      resolve(records);
    });
  });
}

Converting the raw CSV data to a table

Our getTables function needs to an array of Table objects so we will need to convert our raw data. We can use the SDK's createTable function which requires us to pass a few options:

  • table name
  • column definitions
  • table resolver - a function that will write the actual records to a provided stream

We will start with the column definitions. First, we need to prepare a column resolver function. Column resolvers (opens in a new tab) are responsible for mapping your data into the columns of the table. In most cases, you will not need to implement anything special, but in some cases, you may need to implement a custom column resolver to fetch additional data or do custom transformations.

In our case, we will just record the data as it comes (this resolver is already in place):

const getColumnResolver = (c: string): ColumnResolver => {
  return (meta, resource) => {
    const dataItem = resource.getItem();
    resource.setColumData(c, (dataItem as Record<string, unknown>)[c]);
    return Promise.resolve();
  };
};

We will use the first row of the CSV file to get the column names. We will use the Utf8 as the default column type for now.

The starter project already contains a function getTable that serves as an example so let's modify it to write the data from the CSV file into the stream:

const getTable = async (
    rows: string[][],
    tableName: string,
): Promise<Table> => {
  if (rows.length === 0) {
    throw new Error("No rows found");
  }
  const getRecordObjectFromRow = (row: string[]) => {
    const record: Record<string, string> = {};
    for (const [index, element] of row.entries()) {
      record[columnNames[index]] = element;
    }
    return record;
  };
  const columnNames = rows[0];
  // convert all rows except column definitions to an array of Record<string, string> objects
  const tableRecords = rows.filter((_, index) => index > 0).map((r)=>getRecordObjectFromRow(r));

  const columnDefinitions: Column[] = columnNames.map((c) => ({
    name: c,
    type: new Utf8(),
    description: "",
    primaryKey: false,
    notNull: false,
    incrementalKey: false,
    unique: false,
    ignoreInTests: false,
    resolver: getColumnResolver(c),
  }));

  const tableResolver: TableResolver = (clientMeta, parent, stream) => {
    for (const r of tableRecords) stream.write(r)
    return Promise.resolve();
  };
   return createTable({ name: tableName, columns: columnDefinitions, resolver: tableResolver });
};

Connecting it all together

We now have all the pieces in place. We just need to modify the getTables function to load the CSV files and convert them to tables. This is the place where I will use pMap to use the concurrency provided in the spec. It is probably not necessary in this case but it is a good practice to use it when you are doing any IO operations.

import pMap from "p-map";

// ...
export const getTables = async (
  logger: Logger,
  spec: Spec
): Promise<Table[]> => {
  const { path, csvDelimiter, concurrency } = spec;
  
  const files = await getCsvFiles(logger, path);
  logger.info(`done discovering files. Found ${files.length} files`);

  const allTables = await pMap(
    files,
    async (filePath) => {
      const csvFile = await parseCsvFile(filePath, csvDelimiter);
      return getTable(csvFile, Path.basename(filePath, ".csv"));
    },
    {
      concurrency,
    },
  );
  return allTables;
};

We also need to go back to plugin.ts and update the call to getTables to pass the spec:

  const newClient: NewClientFunction = async (
    logger,
    spec,
    { noConnection },
  ) => {
    pluginClient.spec = parseSpec(spec);
    pluginClient.client = { id: () => "cq-js-sample" };
    if (noConnection) {
      pluginClient.allTables = [];
      return pluginClient;
    }
     pluginClient.allTables = await getTables(logger, pluginClient.spec);

    return pluginClient;
  };

First test

Let's see if this all works. First, we need to modify our sync.yml to add the required configuration:

kind: source
spec:
  name: "text-file"
  registry: "grpc"
  path: "localhost:7777"
  version: "v1.0.0"
  tables:
    ["*"]
  destinations:
    - "sqlite"
  spec:
    path: "test_data"
    csvDelimiter: ","

#...

Make sure you specify a path to a directory with some CSV files. Here's an example CSV file you can use:

Name,Count,Age
Peter,3,1.5
Mike,5,2.3
Jack,2,3.2

Now we can start the plugin in a listening mode and then run cloudquery with the sync.yml in the project.

Start the plugin:

$ npm run dev

> @cloudquery/cq-js-plugin-template@1.0.0 dev
> ts-node --esm src/main.ts serve

[2023-09-28T09:57:13.324Z] info server running on port: 7777

And in a new terminal window, run

$ cloudquery sync sync.yml

Loading spec(s) from sync.yml
Starting sync for: text-file (grpc@localhost:7777) -> [sqlite (v2.4.9)]
/ Syncing resources... (12772/-, 2141 resources/s) [11s] 
Sync completed successfully. Resources: 22463, Errors: 0, Warnings: 0, Time: 14s

Once the sync finished, the db.sql file should contain new tables with the data from the CSV files. If you imported the example above, it should be created as a table with CloudQuery columns _cq_sync_time and _cq_source_name and three columns of type TEXT with the names Name, Count, and Age.

Custom column types

Let's do one last thing: we will improve the plugin by adding an automatic detection of columns with numbers. We will do this for files that have at least one row with data (i.e. they have at least two rows) and we will determine the column types by reading the values from the second row.

The types for column definitions are Apache Arrow (opens in a new tab) types exported from CloudQuery JS SDK. We will use the Float64 type for floating-point numbers and Int64 for integers.

First, we'll create two functions to convert a row represented by an array of strings to an array of DataType objects.

import type { DataType } from "@cloudquery/plugin-sdk-javascript/arrow";
import { Utf8, Int64, Float64 } from "@cloudquery/plugin-sdk-javascript/arrow";

//...

const getColumnType = (value: string): DataType => {
  const number = Number(value);
  if(Number.isNaN(number)) return new Utf8();
  if(Number.isInteger(number)) return new Int64();
  return new Float64();
};

const getColumnTypes = (row: string[]): DataType[] => {
  return row.map((value)=>getColumnType(value));
};

Then we will update the getTable function to use the getColumnTypes and to determine the proper type for the data records:


const getTable = async (
  rows: string[][],
  tableName: string,
): Promise<Table> => {
  if (rows.length === 0) {
    throw new Error("No rows found");
  }
  const columnNames = rows[0];
  const getRecordObjectFromRow = (row: string[]) => {
    const record: Record<string, string | number> = {};
    for (const [index, element] of row.entries()) {
      record[columnNames[index]] = Number.isNaN(Number(element)) ? element : Number(element);
    }
    return record;
  };
  const columnTypes = rows.length > 1 ? getColumnTypes(rows[1]) : rows[0].map(()=>new Utf8());
  // convert all rows except column definitions to an array of Record<string, string> objects
  const tableRecords = rows.filter((_, index) => index > 0).map((r)=>getRecordObjectFromRow(r));
  const columnDefinitions: Column[] = columnNames.map((c, index) => ({
    name: c,
    type: columnTypes[index],
    description: "",
    primaryKey: false,
    notNull: false,
    incrementalKey: false,
    unique: false,
    ignoreInTests: false,
    resolver: getColumnResolver(c),
  }));

That's it! Now delete the db.sql file, restart the plugin, and test the sync again. This time, the db.sql file should contain the right type for your number columns: TEXT, INTEGER, and REAL.

Release

JavaScript plugins are published and executed using Docker containers. Our starter project already contains a Dockerfile that packages your plugin into a container. Use the npm run package:container command to build it.

To point CloudQuery to the plugin container, make the following changes to the sync.yml file:

spec:
  name: "cq-js-sample"
  registry: "docker"
  path: "cq-js-sample:latest" # this is the name and tag of the docker image to be pulled.
  version: "v1.0.0"
  tables:
    ["*"]

// ...

To find out how to release your plugin for use by a wider CloudQuery community, see Releasing and Deploying your Plugin in our documentation.

Resources

Starter repository for this blog post and for new plugins: JavaScript Plugin Template (opens in a new tab)

Check out the csv-file-sync (opens in a new tab) branch of the repository above to get to the final code for our CSV plugin.

CloudQuery SDK Documentation

Other JavaScript Source Plugin Examples

Airtable Plugin (opens in a new tab) is another (a bit more advanced) example of a plugin using our JavaScript SDK.