In
the first post of this series I mentioned some popular modules in
the community, such as underscore, async, etc.. I also listed a module named “Wind (zh-CN)”, which is created by one of my friend, Jeff Zhao (zh-CN). Now I would like to use a separated post to introduce this module since I feel it brings a new async programming style in not only Node.js but JavaScript world. If you know or heard about
the new feature in C# 5.0 called “async and await”, or you learnt F#, you will find
the “Wind” brings
the similar async programming experience in JavaScript. By using “Wind”, we can write async code that looks like
the sync code.
The callbacks, async stats and exceptions will be handled by “Wind” automatically and transparently. What’s
the Problem: Dense “Callback” Phobia Let’s firstly back to my second post in this series. As I mentioned in that post, when we wanted to read some records from SQL Server we need to open
the database connection, and then execute
the query. In Node.js all IO operation are designed as async callback pattern which means when
the operation was done, it will invoke a function which was taken from
the last parameter. For example
the database connection opening code would be like this. 1: sql.open(connectionString, function(error, conn) {
2: if(error) {
3: // some error handling code
4: }
5: else {
6: // connection opened successfully
7: }
8: });
And then if we need to query
the database
the code would be like this. It nested in
the previous function.
1: sql.open(connectionString, function(error, conn) {
2: if(error) {
3: // some error handling code
4: }
5: else {
6: // connection opened successfully
7: conn.queryRaw(command, function(error, results) {
8: if(error) {
9: // failed to execute this command
10: }
11: else {
12: // records retrieved successfully
13: }
14: };
15: }
16: });
Assuming if we need to copy some data from this database to another then we need to open another connection and execute
the command within
the function under
the query function.
1: sql.open(connectionString, function(error, conn) {
2: if(error) {
3: // some error handling code
4: }
5: else {
6: // connection opened successfully
7: conn.queryRaw(command, function(error, results) {
8: if(error) {
9: // failed to execute this command
10: }
11: else {
12: // records retrieved successfully
13: target.open(targetConnectionString, function(error, t_conn) {
14: if(error) {
15: // connect failed
16: }
17: else {
18: t_conn.queryRaw(copy_command, function(error, results) {
19: if(error) {
20: // copy failed
21: }
22: else {
23: // and then, what do you want to do now...
24: }
25: };
26: }
27: };
28: }
29: };
30: }
31: });
This is just an example. In
the real project
the logic would be more complicated. This means our application might be messed up and
the business process will be fragged by many callback functions. I would like call this “Dense Callback Phobia”.
This might be a challenge how to make code straightforward and easy to read, something like below.
1: try
2: {
3: // open source connection
4: var s_conn = sqlConnect(s_connectionString);
5: // retrieve data
6: var results = sqlExecuteCommand(s_conn, s_command);
7:
8: // open target connection
9: var t_conn = sqlConnect(t_connectionString);
10: // prepare
the copy command
11: var t_command = getCopyCommand(results);
12: // execute
the copy command
13: sqlExecuteCommand(s_conn, t_command);
14: }
15: catch (ex)
16: {
17: // error handling
18: }
What’s
the Problem: Sync-styled Async Programming
Similar as
the previous problem,
the callback-styled async programming model makes
the upcoming operation as a part of
the current operation, and mixed with
the error handling code. So it’s very hard to understand what on earth this code will do.
And since Node.js utilizes non-blocking IO mode, we cannot invoke those operations one by one, as they will be executed concurrently.
For example, in this post when I tried to copy
the records from Windows Azure SQL Database (a.k.a. WASD) to Windows Azure Table Storage, if I just insert
the data into table storage one by one and then print
the “Finished” message, I will see
the message shown before
the data had been copied. This is because all operations were executed at
the same time.
In order to make
the copy operation and print operation executed synchronously I introduced a module named “async” and
the code was changed as below.
1: async.forEach(results.rows,
2: function (row, callback) {
3: var resource = {
4: "PartitionKey": row[1],
5: "RowKey": row[0],
6: "Value": row[2]
7: };
8: client.insertEntity(tableName, resource, function (error) {
9: if (error) {
10: callback(error);
11: }
12: else {
13: console.log("entity inserted.");
14: callback(null);
15: }
16: });
17: },
18: function (error) {
19: if (error) {
20: error["target"] = "insertEntity";
21: res.send(500, error);
22: }
23: else {
24: console.log("all done.");
25: res.send(200, "Done!");
26: }
27: });
It ensured that
the “Finished” message will be printed when all table entities had been inserted. But it cannot promise that
the records will be inserted in sequence. It might be another challenge to make
the code looks like in sync-style?
1: try
2: {
3: forEach(row in rows) {
4: var entity = { /* ... */ };
5: tableClient.insert(tableName, entity);
6: }
7:
8: console.log("Finished");
9: }
10: catch (ex) {
11: console.log(ex);
12: }
How “Wind” Helps
“Wind” is a JavaScript library which provides
the control flow with plain JavaScript for asynchronous programming (and more) without additional pre-compiling steps. It’s available in NPM so that we can install it through “npm install wind”.
Now let’s create a very simple Node.js application as
the example. This application will take some website URLs from
the command arguments and tried to retrieve
the body length and print them in console. Then at
the end print “Finish”. I’m going to use “request” module to make
the HTTP call simple so I also need to install by
the command “npm install request”.
The code would be like this.
1: var request = require("request");
2:
3: // get
the urls from arguments,
the first two arguments are `node.exe` and `fetch.js`
4: var args = process.argv.splice(2);
5:
6: // main function
7: var main = function() {
8: for(var i = 0; i < args.length; i++) {
9: // get
the url
10: var url = args[i];
11: // send
the http request and try to get
the response and body
12: request(url, function(error, response, body) {
13: if(!error && response.statusCode == 200) {
14: // log
the url and
the body length
15: console.log(
16: "%s: %d.",
17: response.request.uri.href,
18: body.length);
19: }
20: else {
21: // log error
22: console.log(error);
23: }
24: });
25: }
26:
27: // finished
28: console.log("Finished");
29: };
30:
31: // execute
the main function
32: main();
Let’s execute this application. (I made them in multi-lines for better reading.)
1: node fetch.js
2: "http://www.igt.com/us-en.aspx"
3: "http://www.igt.com/us-en/games.aspx"
4: "http://www.igt.com/us-en/cabinets.aspx"
5: "http://www.igt.com/us-en/systems.aspx"
6: "http://www.igt.com/us-en/interactive.aspx"
7: "http://www.igt.com/us-en/social-gaming.aspx"
8: "http://www.igt.com/support.aspx"
Below is
the output.
As you can see
the finish message was printed at
the beginning, and
the pages’ length retrieved in a different order than we specified. This is because in this code
the request command, console logging command are executed asynchronously and concurrently.
Now let’s introduce “Wind” to make them executed in order, which means it will request
the websites one by one, and print
the message at
the end.
First of all we need to import
the “Wind” package and make sure
the there’s only one global variant named “Wind”, and ensure it’s “Wind” instead of “wind”.
1: var Wind = require("wind");
Next, we need to tell “Wind” which code will be executed asynchronously so that “Wind” can control
the execution process. In this case
the “request” operation executed asynchronously so we will create a “Task” by using a build-in helps function in “Wind” named Wind.Async.Task.create.
1: var requestBodyLengthAsync = function(url) {
2: return Wind.Async.Task.create(function(t) {
3: request(url, function(error, response, body) {
4: if(error || response.statusCode != 200) {
5: t.complete("failure", error);
6: }
7: else {
8: var data =
9: {
10: uri: response.request.uri.href,
11: length: body.length
12: };
13: t.complete("success", data);
14: }
15: });
16: });
17: };
The code
above created a “Task” from
the original request calling code. In “Wind” a “Task” means an operation will be finished in some time in
the future. A “Task” can be started by invoke its start() method, but no one knows when it actually will be finished.
The Wind.Async.Task.create helped us to create a task.
The only parameter is a function where we can put
the actual operation in, and then notify
the task object it’s finished successfully or failed by using
the complete() method.
In
the code
above I invoked
the request method. If it retrieved
the response successfully I set
the status of this task as “success” with
the URL and body length. If it failed I set this task as “failure” and pass
the error out.
Next, we will change
the main() function. In “Wind” if we want a function can be controlled by Wind we need to mark it as “async”. This should be done by using
the code below.
1: var main = eval(Wind.compile("async", function() {
2: }));
When
the application is running, Wind will detect “eval(Wind.compile(“async”, function” and generate an anonymous code from
the body of this original function. Then
the application will run
the anonymous code instead of
the original one.
In our example
the main function will be like this.
1: var main = eval(Wind.compile("async", function() {
2: for(var i = 0; i < args.length; i++) {
3: try
4: {
5: var result = $await(requestBodyLengthAsync(args[i]));
6: console.log(
7: "%s: %d.",
8: result.uri,
9: result.length);
10: }
11: catch (ex) {
12: console.log(ex);
13: }
14: }
15:
16: console.log("Finished");
17: }));
As you can see, when I tried to request
the URL I use a new command named “$await”. It tells Wind,
the operation next to $await will be executed asynchronously, and
the main thread should be paused until it finished (or failed). So in this case, my application will be pause when
the first response was received, and then print its body length, then try
the next one. At
the end, print
the finish message.
Finally, execute
the main function.
The full code would be like this.
1: var request = require("request");
2: var Wind = require("wind");
3:
4: var args = process.argv.splice(2);
5:
6: var requestBodyLengthAsync = function(url) {
7: return Wind.Async.Task.create(function(t) {
8: request(url, function(error, response, body) {
9: if(error || response.statusCode != 200) {
10: t.complete("failure", error);
11: }
12: else {
13: var data =
14: {
15: uri: response.request.uri.href,
16: length: body.length
17: };
18: t.complete("success", data);
19: }
20: });
21: });
22: };
23:
24: var main = eval(Wind.compile("async", function() {
25: for(var i = 0; i < args.length; i++) {
26: try
27: {
28: var result = $await(requestBodyLengthAsync(args[i]));
29: console.log(
30: "%s: %d.",
31: result.uri,
32: result.length);
33: }
34: catch (ex) {
35: console.log(ex);
36: }
37: }
38:
39: console.log("Finished");
40: }));
41:
42: main().start();
Run our new application. At
the beginning we will see
the compiled and generated code by Wind. Then we can see
the pages were requested one by one, and at
the end
the finish message was printed.
Below is
the code Wind generated for us. As you can see
the original code,
the output code were shown.
1: // Original:
2: function () {
3: for(var i = 0; i < args.length; i++) {
4: try
5: {
6: var result = $await(requestBodyLengthAsync(args[i]));
7: console.log(
8: "%s: %d.",
9: result.uri,
10: result.length);
11: }
12: catch (ex) {
13: console.log(ex);
14: }
15: }
16:
17: console.log("Finished");
18: }
19:
20: // Compiled:
21: /* async << function () { */ (function () {
22: var _builder_$0 = Wind.builders["async"];
23: return _builder_$0.Start(this,
24: _builder_$0.Combine(
25: _builder_$0.Delay(function () {
26: /* var i = 0; */ var i = 0;
27: /* for ( */ return _builder_$0.For(function () {
28: /* ; i < args.length */ return i < args.length;
29: }, function () {
30: /* ; i ++) { */ i ++;
31: },
32: /* try { */ _builder_$0.Try(
33: _builder_$0.Delay(function () {
34: /* var result = $await(requestBodyLengthAsync(args[i])); */ return _builder_$0.Bind(requestBodyLengthAsync(args[i]), function (result) {
35: /* console.log("%s: %d.", result.uri, result.length); */ console.log("%s: %d.", result.uri, result.length);
36: return _builder_$0.Normal();
37: });
38: }),
39: /* } catch (ex) { */ function (ex) {
40: /* console.log(ex); */ console.log(ex);
41: return _builder_$0.Normal();
42: /* } */ },
43: null
44: )
45: /* } */ );
46: }),
47: _builder_$0.Delay(function () {
48: /* console.log("Finished"); */ console.log("Finished");
49: return _builder_$0.Normal();
50: })
51: )
52: );
53: /* } */ })
How Wind Works
Someone may raise a big concern when you find I utilized “eval” in my code. Someone may assume that Wind utilizes “eval” to execute some code dynamically while “eval” is very low performance. But I would say, Wind does NOT use “eval” to run
the code. It only use “eval” as a flag to know which code should be compiled at runtime.
When
the code was firstly been executed, Wind will check and find “eval(Wind.compile(“async”, function”. So that it knows this function should be compiled. Then it utilized parse-js to analyze
the inner JavaScript and generated
the anonymous code in memory. Then it rewrite
the original code so that when
the application was running it will use
the anonymous one instead of
the original one.
Since
the code generation was done at
the beginning of
the application was started, in
the future no matter how long our application runs and how many times
the async function was invoked, it will use
the generated code, no need to generate again. So there’s no significant performance hurt when using Wind.
Wind in My Previous Demo
Let’s adopt Wind into one of my previous demonstration and to see how it helps us to make our code simple, straightforward and easy to read and understand.
In this post when I implemented
the functionality that copied
the records from my WASD to table storage,
the logic would be like this.
1, Open database connection.
2, Execute a query to select all records from
the table.
3, Recreate
the table in Windows Azure table storage.
4, Create entities from each of
the records retrieved previously, and then insert them into table storage.
5, Finally, show message as
the HTTP response.
But as
the image below, since there are so many callbacks and async operations, it’s very hard to understand my logic from
the code.
Now let’s use Wind to rewrite our code. First of all, of course, we need
the Wind package.
Then we need to include
the package files into project and mark them as “Copy always”.
Add
the Wind package into
the source code. Pay attention to
the variant name, you must use “Wind” instead of “wind”.
1: var express = require("express");
2: var async = require("async");
3: var sql = require("node-sqlserver");
4: var azure = require("azure");
5: var Wind = require("wind");
Now we need to create some async functions by using Wind. All async functions should be wrapped so that it can be controlled by Wind which are open database, retrieve records, recreate table (delete and create) and insert entity in table.
Below are these new functions. All of them are created by using Wind.Async.Task.create.
1: sql.openAsync = function (connectionString) {
2: return Wind.Async.Task.create(function (t) {
3: sql.open(connectionString, function (error, conn) {
4: if (error) {
5: t.complete("failure", error);
6: }
7: else {
8: t.complete("success", conn);
9: }
10: });
11: });
12: };
13:
14: sql.queryAsync = function (conn, query) {
15: return Wind.Async.Task.create(function (t) {
16: conn.queryRaw(query, function (error, results) {
17: if (error) {
18: t.complete("failure", error);
19: }
20: else {
21: t.complete("success", results);
22: }
23: });
24: });
25: };
26:
27: azure.recreateTableAsync = function (tableName) {
28: return Wind.Async.Task.create(function (t) {
29: client.deleteTable(tableName, function (error, successful, response) {
30: console.log("delete table finished");
31: client.createTableIfNotExists(tableName, function (error, successful, response) {
32: console.log("create table finished");
33: if (error) {
34: t.complete("failure", error);
35: }
36: else {
37: t.complete("success", null);
38: }
39: });
40: });
41: });
42: };
43:
44: azure.insertEntityAsync = function (tableName, entity) {
45: return Wind.Async.Task.create(function (t) {
46: client.insertEntity(tableName, entity, function (error, entity, response) {
47: if (error) {
48: t.complete("failure", error);
49: }
50: else {
51: t.complete("success", null);
52: }
53: });
54: });
55: };
Then in order to use these functions we will create a new function which contains all steps for data copying.
1: var copyRecords = eval(Wind.compile("async", function (req, res) {
2: try {
3: }
4: catch (ex) {
5: console.log(ex);
6: res.send(500, "Internal error.");
7: }
8: }));
Let’s execute steps one by one with
the “$await” keyword introduced by Wind so that it will be invoked in sequence. First is to open
the database connection.
1: var copyRecords = eval(Wind.compile("async", function (req, res) {
2: try {
3: // connect to
the windows azure sql database
4: var conn = $await(sql.openAsync(connectionString));
5: console.log("connection opened");
6: }
7: catch (ex) {
8: console.log(ex);
9: res.send(500, "Internal error.");
10: }
11: }));
Then retrieve all records from
the database connection.
1: var copyRecords = eval(Wind.compile("async", function (req, res) {
2: try {
3: // connect to
the windows azure sql database
4: var conn = $await(sql.openAsync(connectionString));
5: console.log("connection opened");
6: // retrieve all records from database
7: var results = $await(sql.queryAsync(conn, "SELECT * FROM [Resource]"));
8: console.log("records selected. count = %d", results.rows.length);
9: }
10: catch (ex) {
11: console.log(ex);
12: res.send(500, "Internal error.");
13: }
14: }));
After recreated
the table, we need to create
the entities and insert them into table storage.
1: var copyRecords = eval(Wind.compile("async", function (req, res) {
2: try {
3: // connect to
the windows azure sql database
4: var conn = $await(sql.openAsync(connectionString));
5: console.log("connection opened");
6: // retrieve all records from database
7: var results = $await(sql.queryAsync(conn, "SELECT * FROM [Resource]"));
8: console.log("records selected. count = %d", results.rows.length);
9: if (results.rows.length > 0) {
10: // recreate
the table
11: $await(azure.recreateTableAsync(tableName));
12: console.log("table created");
13: // insert records in table storage one by one
14: for (var i = 0; i < results.rows.length; i++) {
15: var entity = {
16: "PartitionKey": results.rows[i][1],
17: "RowKey": results.rows[i][0],
18: "Value": results.rows[i][2]
19: };
20: $await(azure.insertEntityAsync(tableName, entity));
21: console.log("entity inserted");
22: }
23: }
24: }
25: catch (ex) {
26: console.log(ex);
27: res.send(500, "Internal error.");
28: }
29: }));
Finally, send response back to
the browser.
1: var copyRecords = eval(Wind.compile("async", function (req, res) {
2: try {
3: // connect to
the windows azure sql database
4: var conn = $await(sql.openAsync(connectionString));
5: console.log("connection opened");
6: // retrieve all records from database
7: var results = $await(sql.queryAsync(conn, "SELECT * FROM [Resource]"));
8: console.log("records selected. count = %d", results.rows.length);
9: if (results.rows.length > 0) {
10: // recreate
the table
11: $await(azure.recreateTableAsync(tableName));
12: console.log("table created");
13: // insert records in table storage one by one
14: for (var i = 0; i < results.rows.length; i++) {
15: var entity = {
16: "PartitionKey": results.rows[i][1],
17: "RowKey": results.rows[i][0],
18: "Value": results.rows[i][2]
19: };
20: $await(azure.insertEntityAsync(tableName, entity));
21: console.log("entity inserted");
22: }
23: // send response
24: console.log("all done");
25: res.send(200, "All done!");
26: }
27: }
28: catch (ex) {
29: console.log(ex);
30: res.send(500, "Internal error.");
31: }
32: }));
If we compared with
the previous code we will find now it became more readable and much easy to understand. It’s very easy to know what this function does even though without any comments.
When user go to URL “/was/copyRecords” we will execute
the function
above.
The code would be like this.
1: app.get("/was/copyRecords", function (req, res) {
2: copyRecords(req, res).start();
3: });
And below is
the logs printed in local compute emulator console. As we can see
the functions executed one by one and then finally
the response back to me browser.
Scaffold Functions in Wind
Wind provides not only
the async flow control and compile functions, but many scaffold methods as well. We can build our async code more easily by using them. I’m going to introduce some basic scaffold functions here.
In
the code
above I created some functions which wrapped from
the original async function such as open database, create table, etc.. All of them are very similar, created a task by using Wind.Async.Task.create, return error or result object through Task.complete function. In fact, Wind provides some functions for us to create task object from
the original async functions.
If
the original async function only has a callback parameter, we can use Wind.Async.Binding.fromCallback method to get
the task object directly. For example
the code below returned
the task object which wrapped
the file exist check function.
1: var Wind = require("wind");
2: var fs = require("fs");
3:
4: fs.existsAsync = Wind.Async.Binding.fromCallback(fs.exists);
In Node.js a very popular async function pattern is that,
the first parameter in
the callback function represent
the error object, and
the other parameters is
the return values. In this case we can use another build-in function in Wind named Wind.Async.Binding.fromStandard. For example,
the open database function can be created from
the code below.
1: sql.openAsync = Wind.Async.Binding.fromStandard(sql.open);
2:
3: /*
4: sql.openAsync = function (connectionString) {
5: return Wind.Async.Task.create(function (t) {
6: sql.open(connectionString, function (error, conn) {
7: if (error) {
8: t.complete("failure", error);
9: }
10: else {
11: t.complete("success", conn);
12: }
13: });
14: });
15: };
16: */
When I was testing
the scaffold functions under Wind.Async.Binding I found for some functions, such as
the Azure SDK insert entity function, cannot be processed correctly. So I personally suggest writing
the wrapped method manually.
Another scaffold method in Wind is
the parallel tasks coordination. In this example,
the steps of open database, retrieve records and recreated table should be invoked one by one, but it can be executed in parallel when copying data from database to table storage. In Wind there’s a scaffold function named Task.whenAll which can be used here.
Task.whenAll accepts a list of tasks and creates a new task. It will be returned only when all tasks had been completed, or any errors occurred. For example in
the code below I used
the Task.whenAll to make all copy operation executed at
the same time.
1: var copyRecordsInParallel = eval(Wind.compile("async", function (req, res) {
2: try {
3: // connect to
the windows azure sql database
4: var conn = $await(sql.openAsync(connectionString));
5: console.log("connection opened");
6: // retrieve all records from database
7: var results = $await(sql.queryAsync(conn, "SELECT * FROM [Resource]"));
8: console.log("records selected. count = %d", results.rows.length);
9: if (results.rows.length > 0) {
10: // recreate
the table
11: $await(azure.recreateTableAsync(tableName));
12: console.log("table created");
13: // insert records in table storage in parallal
14: var tasks = new Array(results.rows.length);
15: for (var i = 0; i < results.rows.length; i++) {
16: var entity = {
17: "PartitionKey": results.rows[i][1],
18: "RowKey": results.rows[i][0],
19: "Value": results.rows[i][2]
20: };
21: tasks[i] = azure.insertEntityAsync(tableName, entity);
22: }
23: $await(Wind.Async.Task.whenAll(tasks));
24: // send response
25: console.log("all done");
26: res.send(200, "All done!");
27: }
28: }
29: catch (ex) {
30: console.log(ex);
31: res.send(500, "Internal error.");
32: }
33: }));
34:
35: app.get("/was/copyRecordsInParallel", function (req, res) {
36: copyRecordsInParallel(req, res).start();
37: });
Besides
the task creation and coordination, Wind supports
the cancellation solution so that we can send
the cancellation signal to
the tasks. It also includes exception solution which means any exceptions will be reported to
the caller function.
Summary
In this post I introduced a Node.js module named Wind, which created by my friend Jeff Zhao. As you can see, different from other async library and framework, adopted
the idea from F# and C#, Wind utilizes runtime code generation technology to make it more easily to write async, callback-based functions in a sync-style way. By using Wind there will be almost no callback, and
the code will be very easy to understand.
Currently Wind is still under developed and improved. There might be some problems but
the author, Jeff, should be very happy and enthusiastic to learn your problems, feedback, suggestion and comments. You can contact Jeff by
- Email:
[email protected]
- Group: https://groups.google.com/d/forum/windjs
- GitHub: https://github.com/JeffreyZhao/wind/issues
Source code can be download here.
Hope this helps,
Shaun
All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under
the Creative Commons License.