Creating a New JavaScript Source Plugin from Scratch
October 1, 2023
In this blog post, I will walk you through the steps to create a new source plugin using CloudQuery JavaScript SDK. The purpose is to give you a basic overview of how to create tables and columns and how to sync the actual data.
What You Need to Know
Make sure you go over the core concepts of CloudQuery and the JavaScript SDK.
I will be using TypeScript for IDE autocompletion and code type safety.
Prerequisites
To build and release the plugin for public use, you will need Docker (opens in a new tab) to be installed locally.
The Plan
At the high-level, this is what I would like to achieve:
- The plugin should support reading a set of CSV files from a directory.
- Each file will be synced as a single table in the destination.
- The column names will be read from the first row.
- The plugin should recognize basic types - I will start with numbers and everything else will be a string.
Here's what I imagine the configuration will look like:
kind: source
spec:
name: "text-file"
registry: "grpc" # for local testing only, this will change once the plugin is published
path: "localhost:7777" # same as above
version: "v1.0.0"
tables:
["*"]
destinations:
- "sqlite"
spec:
path: "test_data/" # specify folder or a single file
csvDelimiter: ";"
---
kind: destination
spec:
name: sqlite
path: cloudquery/sqlite
version: "v2.4.9"
spec:
connection_string: ./db.sql
Getting Started
We will not start completely from scratch but rather use a template that needs filling in the blanks. There is a javascript-plugin-template (opens in a new tab) starter repository that you can clone.
To install all dependencies and test whether it is running, run the following commands:
npm install
npm run dev # this will run the plugin in dev mode as a server listening on localhost:7777
cloudquery sync sync.yml #run this in a new terminal tab
This will create a new SQLite database file named db.sql
with table Names
and two records. You can open this file using DB Browser for SQLite (opens in a new tab) or any other SQLite client.
What's inside
The main content is in the src
folder:
main.ts
- The main wrapper for the plugin. It's responsibility is to start the plugin the CLI will communicate with when running the sync.plugin.ts
- The body of the plugin. The main responsibility of this module is to return an initialized instance of the plugin to serve.spec.ts
- This module is responsible for handling and validating plugin configuration.tables.ts
- This is the module where the main work happens and the one we'll be working with the most.tables.test.ts
- Sample unit tests for the module above.
How it all works
When CloudQuery runs a sync, it will call the plugin clients' sync
function and expect the plugin to load the tables. We will be implementing the functions to return the tables back to the plugin client.
There are other functions that the The SDK provides a basic implementation for the plugins so we will not need to implement anything else and we will rely on the implementation returned by the newPlugin
function.
The code below (see plugin.ts
) takes care of creating a newClient
function that will be called by the SDK and it will connect the plugin configuration passed in spec
with the getTables
where the actual implementation will happen.
const newClient: NewClientFunction = async (
logger,
spec,
{ noConnection },
) => {
pluginClient.spec = parseSpec(spec);
pluginClient.client = { id: () => "cq-js-sample" };
if (noConnection) {
pluginClient.allTables = [];
return pluginClient;
}
pluginClient.allTables = await getTables();
return pluginClient;
};
pluginClient.plugin = newPlugin("cq-js-sample", version, newClient);
The getTables
is a function exported from the tables
module. It is an asynchronous function returning an array of tables that will be passed to the destination plugin. In this sample plugin, it only returns one table.
// tables.ts
export const getTables = async (
): Promise<Table[]> => {
const table = await getTable();
return [table];
};
The table returned is specified by a definition of columns and a tableResolver
, which takes care of writing to a stream provided by the SDK.
const columnNames = ["First Name", "Last Name"];
const tableRecords = [{"First Name": "Jack", "Last Name": "Bauer"}, {"First Name": "Thomas", "Last Name": "Kirkman"}]
// ...
const tableResolver: TableResolver = (clientMeta, parent, stream) => {
for (const r of tableRecords) stream.write(r)
return Promise.resolve();
};
return createTable({ name: "Names", columns: columnDefinitions, resolver: tableResolver });
We will need to change this to get the tableRecords
from a CSV file.
Let's Code!
Plugin configuration
We will start with the plugin configuration. To parse a set of CSV files, we'll need the following:
- a path to where the files are; mandatory
- a CSV delimiter; defaults to ","
This is what the spec will look like in the YAML for CloudQuery CLI:
spec:
path: "test_data/"
csvDelimiter: ";"
We will define the spec in the spec.ts
file:
const spec = {
type: "object",
properties: {
concurrency: { type: "integer" },
path: { type: "string" },
csvDelimiter: {type: "string" },
},
required: ["path"],
};
const ajv = new Ajv.default();
const validate = ajv.compile(spec);
export type Spec = {
concurrency: number;
path: string;
csvDelimiter: string;
};
export const parseSpec = (spec: string): Spec => {
const parsed = JSON.parse(spec) as Partial<Spec>;
const valid = validate(parsed);
if (!valid) {
throw new Error(`Invalid spec: ${JSON.stringify(validate.errors)}`);
}
const {
concurrency = 10_000,
path = "",
csvDelimiter = ",",
} = camelcaseKeys(parsed);
! return { concurrency, path, csvDelimiter };
};
This will now get passed to our getTables
function in tables.ts
, so we can add it as an argument there. We'll also add a logger and pass it from the plugin initialization.
export const getTables = async (
logger: Logger,
spec: Spec
): Promise<Table[]> => {
const table = await getTable();
return [table];
};
Reading the CSV files
This should be fairly straightforward. We will use the fs
module to get the list of files in the provided path
and then load the files using csv-parse
. We will add a function to get the CSV file paths and another one to parse a file.
Note: Install the csv-parse
package using npm install csv-parse
.
//update imports:
import type { Logger } from "winston";
import fs from "node:fs/promises";
import Path from "node:path";
// ...
const getCsvFiles = async (logger: Logger, path: string): Promise<string[]> => {
const stats = await fs.stat(path);
if (stats.isDirectory()) {
const files = await fs.readdir(path, { withFileTypes: true });
return files.filter((f) => f.isFile()).map((f) => Path.join(path, f.name));
}
logger.error("Target path is not a directory.");
return [];
};
const parseCsvFile = async (path: string, csvDelimiter: string): Promise<string[][]> => {
const content = await fs.readFile(path);
return new Promise<string[][]>((resolve, reject) => {
parse(content, { delimiter: csvDelimiter }, (error, records) => {
if (error) {
reject(error);
return;
}
resolve(records);
});
});
}
Converting the raw CSV data to a table
Our getTables
function needs to an array of Table
objects so we will need to convert our raw data. We can use the SDK's createTable
function which requires us to pass a few options:
- table name
- column definitions
- table resolver - a function that will write the actual records to a provided stream
We will start with the column definitions. First, we need to prepare a column resolver function. Column resolvers (opens in a new tab) are responsible for mapping your data into the columns of the table. In most cases, you will not need to implement anything special, but in some cases, you may need to implement a custom column resolver to fetch additional data or do custom transformations.
In our case, we will just record the data as it comes (this resolver is already in place):
const getColumnResolver = (c: string): ColumnResolver => {
return (meta, resource) => {
const dataItem = resource.getItem();
resource.setColumData(c, (dataItem as Record<string, unknown>)[c]);
return Promise.resolve();
};
};
We will use the first row of the CSV file to get the column names. We will use the Utf8
as the default column type for now.
The starter project already contains a function getTable
that serves as an example so let's modify it to write the data from the CSV file into the stream:
const getTable = async (
rows: string[][],
tableName: string,
): Promise<Table> => {
if (rows.length === 0) {
throw new Error("No rows found");
}
const getRecordObjectFromRow = (row: string[]) => {
const record: Record<string, string> = {};
for (const [index, element] of row.entries()) {
record[columnNames[index]] = element;
}
return record;
};
const columnNames = rows[0];
// convert all rows except column definitions to an array of Record<string, string> objects
const tableRecords = rows.filter((_, index) => index > 0).map((r)=>getRecordObjectFromRow(r));
const columnDefinitions: Column[] = columnNames.map((c) => ({
name: c,
type: new Utf8(),
description: "",
primaryKey: false,
notNull: false,
incrementalKey: false,
unique: false,
ignoreInTests: false,
resolver: getColumnResolver(c),
}));
const tableResolver: TableResolver = (clientMeta, parent, stream) => {
for (const r of tableRecords) stream.write(r)
return Promise.resolve();
};
return createTable({ name: tableName, columns: columnDefinitions, resolver: tableResolver });
};
Connecting it all together
We now have all the pieces in place. We just need to modify the getTables
function to load the CSV files and convert them to tables. This is the place where I will use pMap
to use the concurrency provided in the spec. It is probably not necessary in this case but it is a good practice to use it when you are doing any IO operations.
import pMap from "p-map";
// ...
export const getTables = async (
logger: Logger,
spec: Spec
): Promise<Table[]> => {
const { path, csvDelimiter, concurrency } = spec;
const files = await getCsvFiles(logger, path);
logger.info(`done discovering files. Found ${files.length} files`);
const allTables = await pMap(
files,
async (filePath) => {
const csvFile = await parseCsvFile(filePath, csvDelimiter);
return getTable(csvFile, Path.basename(filePath, ".csv"));
},
{
concurrency,
},
);
return allTables;
};
We also need to go back to plugin.ts
and update the call to getTables
to pass the spec:
const newClient: NewClientFunction = async (
logger,
spec,
{ noConnection },
) => {
pluginClient.spec = parseSpec(spec);
pluginClient.client = { id: () => "cq-js-sample" };
if (noConnection) {
pluginClient.allTables = [];
return pluginClient;
}
pluginClient.allTables = await getTables(logger, pluginClient.spec);
return pluginClient;
};
First test
Let's see if this all works. First, we need to modify our sync.yml
to add the required configuration:
kind: source
spec:
name: "text-file"
registry: "grpc"
path: "localhost:7777"
version: "v1.0.0"
tables:
["*"]
destinations:
- "sqlite"
spec:
path: "test_data"
csvDelimiter: ","
#...
Make sure you specify a path to a directory with some CSV files. Here's an example CSV file you can use:
Name,Count,Age
Peter,3,1.5
Mike,5,2.3
Jack,2,3.2
Now we can start the plugin in a listening mode and then run cloudquery with the sync.yml in the project.
Start the plugin:
$ npm run dev
> @cloudquery/cq-js-plugin-template@1.0.0 dev
> ts-node --esm src/main.ts serve
[2023-09-28T09:57:13.324Z] info server running on port: 7777
And in a new terminal window, run
$ cloudquery sync sync.yml
Loading spec(s) from sync.yml
Starting sync for: text-file (grpc@localhost:7777) -> [sqlite (v2.4.9)]
/ Syncing resources... (12772/-, 2141 resources/s) [11s]
Sync completed successfully. Resources: 22463, Errors: 0, Warnings: 0, Time: 14s
Once the sync finished, the db.sql
file should contain new tables with the data from the CSV files. If you imported the example above, it should be created as a table with CloudQuery columns _cq_sync_time
and _cq_source_name
and three columns of type TEXT
with the names Name
, Count
, and Age
.
Custom column types
Let's do one last thing: we will improve the plugin by adding an automatic detection of columns with numbers. We will do this for files that have at least one row with data (i.e. they have at least two rows) and we will determine the column types by reading the values from the second row.
The types for column definitions are Apache Arrow (opens in a new tab) types exported from CloudQuery JS SDK. We will use the Float64
type for floating-point numbers and Int64
for integers.
First, we'll create two functions to convert a row represented by an array of strings to an array of DataType
objects.
import type { DataType } from "@cloudquery/plugin-sdk-javascript/arrow";
import { Utf8, Int64, Float64 } from "@cloudquery/plugin-sdk-javascript/arrow";
//...
const getColumnType = (value: string): DataType => {
const number = Number(value);
if(Number.isNaN(number)) return new Utf8();
if(Number.isInteger(number)) return new Int64();
return new Float64();
};
const getColumnTypes = (row: string[]): DataType[] => {
return row.map((value)=>getColumnType(value));
};
Then we will update the getTable
function to use the getColumnTypes
and to determine the proper type for the data records:
const getTable = async (
rows: string[][],
tableName: string,
): Promise<Table> => {
if (rows.length === 0) {
throw new Error("No rows found");
}
const columnNames = rows[0];
const getRecordObjectFromRow = (row: string[]) => {
const record: Record<string, string | number> = {};
for (const [index, element] of row.entries()) {
record[columnNames[index]] = Number.isNaN(Number(element)) ? element : Number(element);
}
return record;
};
const columnTypes = rows.length > 1 ? getColumnTypes(rows[1]) : rows[0].map(()=>new Utf8());
// convert all rows except column definitions to an array of Record<string, string> objects
const tableRecords = rows.filter((_, index) => index > 0).map((r)=>getRecordObjectFromRow(r));
const columnDefinitions: Column[] = columnNames.map((c, index) => ({
name: c,
type: columnTypes[index],
description: "",
primaryKey: false,
notNull: false,
incrementalKey: false,
unique: false,
ignoreInTests: false,
resolver: getColumnResolver(c),
}));
That's it! Now delete the db.sql
file, restart the plugin, and test the sync again. This time, the db.sql
file should contain the right type for your number columns: TEXT
, INTEGER
, and REAL
.
Release
JavaScript plugins are published and executed using Docker containers. Our starter project already contains a Dockerfile that packages your plugin into a container. Use the npm run package:container
command to build it.
To point CloudQuery to the plugin container, make the following changes to the sync.yml
file:
spec:
name: "cq-js-sample"
registry: "docker"
path: "cq-js-sample:latest" # this is the name and tag of the docker image to be pulled.
version: "v1.0.0"
tables:
["*"]
// ...
To find out how to release your plugin for use by a wider CloudQuery community, see Releasing and Deploying your Plugin in our documentation.
Resources
Starter repository for this blog post and for new plugins: JavaScript Plugin Template (opens in a new tab)
Check out the csv-file-sync
(opens in a new tab) branch of the repository above to get to the final code for our CSV plugin.
Other JavaScript Source Plugin Examples
Airtable Plugin (opens in a new tab) is another (a bit more advanced) example of a plugin using our JavaScript SDK.