Easily Create Complex Workflows With AWS Step Functions

Often applications, especially if we are talking about e-Commerce or enterprise software, consist of complex repeatable scenarios(workflows) that must be executed in a response to some event. Let’s take a look at a typical example of such workflow that we can see in e-Commerce software — order processing:

In the world of serveless solutions, each action(node) of the workflow can be represented as a corresponding AWS Lambda function. It will be short, easy to test and will have a single responsibility — doing its actual job. But who will be responsible for coordinating those functions, chaining them, checking conditions and deciding what to do next? Earlier, it was the responsibility of the developer to implement the way multiple connected Lambda functions should interact. But recently Amazon has announced a new service that allows coordinating various AWS services by using the workflow abstraction and visual tools — AWS Step Functions.

In this article, I’m going to demonstrate some basic features of AWS Step Functions and how they can be executed in a response to any event.

We’ll create a simple workflow that will be executed once a new file is uploaded to an S3 bucket. It will process it, save contents to the DynamoDB, move the file to the “processed” folder and notify the user via email in 10 minutes after processing. It will be something like this:

Input files will contain sensor data in the CSV format:

<SENSOR_UID>,<TIMESTAMP>,<VALUE>

For example:

0000-0001,1483906366,10.10
0000-0011,1483906376,-5.10
0000-0093,1483906376,3.80
0000-0107,1483906520,115.45
0000-0001,1483906520,27.54
0000-2101,1483908322,340.00
0000-0001,1483908322,-12.93
0000-0001,1483906366,00.35

Preparation

Before we proceed with creating our first workflow, we need to go over the next checklist to ensure all required software is installed and an account with proper permissions is used.

  1. Check that AWS CLI is installed. You should be able to execute “aws –version” in your command line successfully. In a case, if the AWS CLI is not installed, please follow instructions from the AWS CLI Page.

  2. Double check AWS credentials are correctly configured. The easiest way to do it is to execute aws configure command.

  3. Ensure the current user has enough permissions to view/create CloudFormation Stacks, view/create AWS Lambda functions, view/create AWS Step Functions, view/create buckets.

  4. Since we will be writing our Lambda functions using JavaScript, ensure NodeJS & NPM are installed. Please refer to the official documentation to find instructions on how to install it for your OS.

Creating Lambda Functions

We have four nodes in our workflow, but one of them describes a wait state, meaning the workflow itself requires three AWS Lambda functions.

Unfortunately, there is no way at the moment to execute a Step Function as a response to some event. Instead, the execution should be explicitly started using the API. To execute our workflow each time file is uploaded, we can simply create an additional Lambda function that will be triggered by the file upload and will execute our workflow. So we will need four functions: executeWorkflow, processFile, moveFile, and sendEmail.

Create a new serverless.io project

First of all, ensure serverless is installed. It can be done using the following command:

sudo npm install -g serverless serverless-step-functions

Once the serverless is installed, a new project should be created:

mkdir step-functions-demo
cd step-functions-demo
serverless create -t aws-nodejs
npm init -y
npm install async aws-sdk --save

The serverless framework simplifies our lives and allows to quickly create Lambda-backed applications without a need of writing long CloudFormation scripts and deploying functions manually.

How to trigger an execution of a step function by an S3 event?

Update: This section was missing initially. Kudos to HackerNews user djhworld who pointed that out! AWS Step Functions doesn’t have an option to trigger the execution by events similar to AWS Lambda: S3, IoT, SNS, etc. The only alternative at the moment is explicitly executing a step function using the AWS SDK.

While it may seem like an issue at first, the problem can be easily solved by wiring your event to a separate Lambda that acts as a proxy and passes its input parameters to the target step function.

Here is a related code snippet:

module.exports.executeWorkflow = function (event, context) {
  if ('Records' in event) {
    const stateMachineName = process.env.STEP_FUNCTION_NAME;
    const stepfunctions = new AWS.StepFunctions();

    async.waterfall([
      (next) => {
        console.log('Fetching the list of available workflows');
        return stepfunctions.listStateMachines({}, next);
      },
      (data, next) => {
        console.log(data, next);
        console.log('Searching for the step function', data);

        for (var i = 0; i < data.stateMachines.length; i++) {
          const item = data.stateMachines[i];
          if (item.name === stateMachineName) {
            console.log('Found the step function', item);
            return next(null, item.stateMachineArn);
          }
        }

        throw 'Step function with the given name doesn\'t exist';
      },
      (stateMachineArn, next) => {
        console.log('Executing the step function', stateMachineArn);
        const eventData = event.Records[0];
        return stepfunctions.startExecution({
          stateMachineArn: stateMachineArn,
          input: JSON.stringify({ objectKey: eventData.s3.object.key, bucketName: eventData.s3.bucket.name })
        }, next);
      },
      () => {
        return context.succeed('OK');
      }
    ]);
  } else {
    return context.fail('Incoming message doesn\'t contain "Records", it will be ignored', event);
  }
};

Note the Lambda function uses the STEP_FUNCTION_NAME environment variable. This workaround was needed because at the moment the post was written, we didn’t have a way to reference a Step Function ARN in other parts of the serverless configuration.

File processing Lambda

Let’s begin with the Lambda function that will be responsible for parsing the CSV and saving it to a DynamoDB table.

Here is a code fragment used for processing:

module.exports.processFile = (event, context, callback) => {
  const csv = require('fast-csv');
  const s3 = new AWS.S3();
  const dynamodb = new AWS.DynamoDB();

  async.waterfall([
    (next) => {
      console.log('Waiting until the uploaded object becomes available',
        '[bucket = ', event.bucketName, ', key = ',
        event.objectKey, '  ]');
      s3.waitFor('objectExists', {
        Bucket: event.bucketName,
        Key: event.objectKey
      }, next);
    },
    (result, next) => {
      console.log('Downloading the CSV file from S3 [bucket = ',
        event.bucketName, ', key = ', event.objectKey, '  ]');

      const csvStream = s3.getObject({
        Bucket: event.bucketName,
        Key: event.objectKey
      }).createReadStream();

      csv.fromStream(csvStream).on('data', (data) => {
        dynamodb.putItem({
          Item: {
            'sensor_id': {
              'S': data[0]
            },
            'timestamp': {
              'N': data[1]
            },
            'value': {
              'N': data[2]
            }
          },
          TableName: "sensor_data"
        });
      });

      next(null);
    },
  ], (err, results) => {
    if (err) {
      console.log('Failed execution');
      return context.fail('Execution failed');
    } else {
      console.log('Successful execution');
      return context.succeed(event);
    }
  });
};

Note the first function in the waterfall chain. By adding an AWS waiter call we are protecting against the drawback of a distributed nature of the AWS S3. Sometimes, when a Lambda function is being triggered by the S3 event, an attempt to read object results in “Object doesn’t exist” error. It’s because the S3 objects are eventually consistent. More details can be found in the official documentation.

Lambda to move processed files

Let’s proceed with the second Lambda function that will be moving files to the “processed” folder. Here is the relevant code fragment

module.exports.moveFile = function (event, context) {
  const objectKey = event.objectKey;
  const bucketName = event.bucketName;
  const newLocation = 'processed/' + objectKey;
  const targetBucket = process.env.TARGET_BUCKET;
  const s3 = new AWS.S3();

  console.log('Moving "', objectKey, '" to new location "', newLocation, '"');
  async.waterfall([
    (next) => {
      s3.copyObject({
        Bucket: targetBucket,
        Key: newLocation,
        CopySource: bucketName + '/' + encodeURIComponent(objectKey)
      }, next);
    },
    (data, next) => {
      s3.waitFor('objectExists', {
        Bucket: targetBucket,
        Key: newLocation
      }, next);
    },
    (data, next) => {
      s3.deleteObject({
        Bucket: bucketName,
        Key: objectKey
      }, next);
    }
  ], (error) => {
    if (error) {
      console.log('Failed to move file', error);
      context.fail();
    } else {
      context.succeed({
        bucketName: event.bucketName,
        objectKey: event.objectKey,
        newLocation: newLocation
      });
    }
  });
};

Once the copying is complete, we can proceed with sending a notification.

Lambda to send email notifications

My favorite part about the AWS is the fact that in 99% of cases Amazon has an easy-to-use and scalable service to solve the problem. Of course, there is one for sending emails — Amazon Simple Email Service. We’ll use it to notify ourselves once the file processing is finished. The Lambda function responsible for it is shown below:

module.exports.sendEmail = function (event, context) {
  const objectKey = event.objectKey;
  const bucketName = event.sourceBucket;
  const ses = new AWS.SES();

  console.log('Sending an email about "', objectKey, '"');
  async.waterfall([
    (next) => {
      ses.sendEmail({
        Destination: {
          ToAddresses: [process.env.DEST_EMAIL]
        },
        Message: {
          Body: {
            Text: {
              Data: 'Processed file ' + objectKey
            }
          },
          Subject: {
            Data: 'File processed'
          }
        },
        Source: process.env.DEST_EMAIL
      }, next);
    }], (err, results) => {
      if (err) {
        console.log('Failed to send an email', err);
        context.fail();
      } else {
        context.succeed("OK");
      }
    });
};

As I mentioned before, we are going to notify the user in 10 minutes. As you can see, there are no any delays in the Lambda code. Furthermore, having such delays would affect the total cost of the solution, since AWS Lambda costs are based on the time of the execution. The more efficient our Lambda functions, the less money they cost. They delay instead will be implemented as a part of the workflow.

Creating the workflow

Luckily there is a Step Functions plugin for serverless that can be used to describe workflows using YAML:

stepFunctions:
  stateMachines:
    stepfunctionsdemo:
      Comment: "Example StepFunction"
      StartAt: ProcessFile
      States:
        ProcessFile:
          Type: Task
          Resource: processFile
          Next: MoveFile      
        MoveFile:
          Type: Task
          Resource: moveFIle
          Next: SendEmail
        SendEmail:
          Type: Task
          Resource: sendEmail
          End: true

Recently Amazon announced the support for the Step Functions in CloudFormation templates, but using the serverless still saves a lot of time and reduces the amount of a boilerplate code.

Deployment

To deploy functions and create all the needed infrastructure run following commands of the serverless:

  serverless deploy
  serverless deploy stepf

After the deployment is complete, try uploading a CSV file to the source bucket and check the AWS Step Functions console. Select the created step function and check it’s executions. If everything was configured correctly, you should see a successful execution:

It’s possible to get execution logs, input, and output for every node by clicking it. Detailed logs for each Lambda function can be found in the CloudWatch console also.

Common issues and ways to solve them

Here are some common issues that may prevent the code from working correctly:

  1. Step function and underlying Lambda functions work correctly, but no log records are present in the CloudWatch. In 99% the root cause is missing CloudWatch permissions for the Lambda execution role. Remaining 1% can be split into two parts — 0.5% for cases when you are looking for logs in the incorrect region, another 0.5% cases are caused by the fact it takes several seconds for log streams to appear in the CloudWatch console.

  2. Step-function is not being executed — make sure the Lambda execution role has corresponding permissions to list and execute step functions.

  3. AWS Lambda timeout happens even if the function doesn’t have any time-consuming operations in it and “Task timed out after 30.00 seconds” message appears. Such issue occurs when you have a “waitFor” call in your code, but don’t have enough permissions to execute the GetObject operation. In this case, AWS waiter will continue querying the S3 again and again (20 times each 5 seconds by default).

Limitations

AWS Step Functions is a rather young service, meaning it still has some space for improvements. Here are some limitations we faced while working on our projects:

  1. Step Functions Console display only last 1000 executions for each step function. Once the number of executions crosses 1000, this value will be frozen by making the console less informative.

  2. Deleting a step function and creating a new one with the same name immediately results in a very strange behavior of the Step Functions console. On each page refresh, it may display data both from new and old step function.

  3. Some of the step functions that don’t have active executions still can’t be deleted and will remain in the “Deleting” state forever.

Conclusion

AWS Step Functions is a great tool to create sophisticated workflows and state machines for zero-infrastructure applications. There are still many areas which should be improved, but even in its current state AWS Step Functions can be used to solve real-world problems.

Useful links

Here are some documents that can be useful when developing your own step functions:

Comments

©2016-2017 — AgileVision sp. z o.o. All rights reserved.