Menu
Amazon Kinesis Data Analytics
SQL Reference

JOIN clause

Amazon Kinesis Data Analytics supports joining a stream with another stream and/or joining a stream with a reference table.

Syntax Chart for the JOIN Clause

A join combines two relations according to some condition. The relation resulting from a join has the columns of the left and right inputs to the join.

Join types

There are five types of joins:

INNER JOIN (or just JOIN)

Returns all pairs of rows from left and right for which the join condition evaluates to TRUE.

LEFT OUTER JOIN (or just LEFT JOIN)

As INNER JOIN, but rows from the left are kept even if they do not match any rows on the right; NULL values are generated on the right.

RIGHT OUTER JOIN (or just RIGHT JOIN)

As INNER JOIN, but rows from the right are kept even if they do not match any rows on the left; NULL values are generated on the left for these rows.

FULL OUTER JOIN (or just FULL JOIN)

As INNER JOIN, but rows from both sides are kept even if they do not match any rows on the other side; NULL values are generated on the other side for these rows.

CROSS JOIN

Returns the cartesian product of the inputs: every row from the left is paired with every row from the right. A cross join never has an ON or USING condition.

For information about streaming joins, see Streaming JOINs, which includes stream-to-stream joins and conceptual stream-to-table joins using a UDX.

The NATURAL keyword  is actually a condition. It is described with ON and USING in Join Conditions later in this topic.

Join Conditions

All types of join except CROSS JOIN accept a join condition.

There are three ways to specify a join condition:

  • The ON condition evaluates a Boolean condition. It is the most general and powerful way to specify a join condition.

  • USING (column {, column }...) matches columns from left and right. For each named column, left and right must both have a column of that name. r1 JOIN r2 USING (c1, c2) is equivalent to r1 JOIN r2 ON r1.c1 = r2.c1 AND r1.c2 = r2.c2.

  • Inserting the NATURAL keyword before INNER, LEFT, RIGHT or FULL JOIN creates a condition that matches each pair of columns that have the same name on left and right side of the join.

A WHERE clause (WHERE Condition Clause) can achieve a similar effect to ON, except that it filters the rows after they have been emitted from the join. For an inner join, WHERE is equivalent to ON, but for an outer join, the partially NULL rows are only generated correctly if the condition is evaluated for each pair of candidate rows, and a WHERE clause cannot do that. For more details, see the topic WHERE clause in this guide.

Streaming JOINs

JOIN can be used in a streaming query provided that at least one of the relations being joined is a stream. Streaming joins work just like regular table joins, but subject to the considerations implicit in dealing with streams, that is, rolling windows and rowtimes:

  • Rolling windows -- A window defined on a stream is a rolling (sliding) window. As the current time progresses, the window excludes some rows while adding others. As a result, rows output by a join with a sliding window are generated incrementally. It is important to note that an output row is only produced once by a match on a given pair of columns from the left and right input streams. In other words, an output row already produced by a prior match will not be produced anew by a subsequent identical match.

  • Output rowtimes -- All output rows are produced in order of non-descending rowtime. (It is valid to have multiple output rows with the same rowtime.)

  • As a rule, the rowtime of a given output row is the rowtime at the point it was possible to calculate the output row. In other words:

  • For an inner join, the rowtime of an output row is the later of the rowtimes of the two input rows. This is also true for an outer join in which matching input rows are found.

  • For outer joins in which a match is not found, the rowtime of an output row is the later of the following two times:

  • (a) the rowtime of the input row for which a match was not found, or

  • (b) the later bound of the window of the other input stream at the point any possible match passed out of the window.

All streaming joins are implicitly windowed joins between the affected streams. If no explicit window is specified, the window specification CURRENT ROW is used.

Stream-to-table Joins

If one of the relations is a stream and the other is a finite relation, it is referred to as a stream-table join. For each row in the stream, the query looks up the row or rows in the table that match the join condition. Amazon Kinesis Data Analytics accomplishes conceptual stream-table joins by using a UDX named TableLookup.

For example, Orders is a stream and PriceList is a table. The effect of the join is to add price list information to the order.

Stream-to-stream Joins

If both of the relations being joined are streams, it is referred to as a stream-stream join. Clearly it is not practical to join the entire history of the left stream to the entire history of the right, so at least one stream must be restricted to a time window by the use of an OVER clause.

The OVER clause defines a window of rows that are to be considered for joining at a given time.

The window can be time-based or row-based:

  • A time-based window uses the RANGE keyword, and defines the window as the set of rows whose ROWTIME column falls within a particular time interval of the query's current time.

  • A row-based window uses the ROWS keyword, and defines the window as a given count of rows before or after the row with the current timestamp.

To illustrate, let's look at an example.

SELECT STREAM ROWTIME, o.orderId, o.ROWTIME AS orderTime FROM Shipments AS s  JOIN Orders OVER (RANGE INTERVAL '1' HOUR PRECEDING) AS o  ON o.orderId = s.orderId;

We have not specified an OVER clause for the Shipments stream, so at any moment in time we are considering matching the current row of the Shipments stream against rows from the Orders stream in the hour preceding.

Orders and Shipments inputs produce the following output:

Orders Shipments

rowtime

orderId

rowtime

orderId

10:00

100

10:10

101

10:20

102

10:25

103

10:30

101

10:40

104

10:45

100

10:55

103

11:05

103

11:30

104

The output rows are as follows:

rowtime orderId orderTime

10:30

101

10:10

10:45

100

10:00

10:55

103

10:25

11:05

103

10:25

11:30

104

10:40

First of all, notice that output rows have the timestamp of the Shipments.ROWTIME column, and are sorted in that order. Order 100, 101 and 104 are each matched by a shipment within the window, and order 103 is matched by two shipments. But order 102 is omitted, because its shipment is not made within its one hour time window 10:20-11:20.

The window specification may also be the name of a window defined in the WINDOW Clause (Sliding Windows). However, windows specified by name have the same limitation as windows specified inline: see the subtopic entitled Allowed and Disallowed Window Specifications of the WINDOW clause topic.

Additional window limitations specific to streaming joins are as follows:

  • PARTITION BY must not be present.

  • ORDER BY, if present, must sort by the ROWTIME column of one of the inputs.

Rowtime generation

The timestamp of the generated row is the earliest time that rows necessary to make the match are in both sides' windows. For example, in the previous example, the output row (10:45, 100, 10:00) could only be made when the left window is 10:00-10:00 and the right window is 10:00-11:00. A minute earlier, the windows would have been 09:59-09:59 and 09:59-10:59, in which case the necessary row would have been in the right hand window but not the left.

Rowtime bounds

Rowtime bounds received on the left and right side of the join help the join to make progress. In the above example, the (10:30, 101, 10:10) output record can be emitted only when the order 100's window has expired. That expiration was only evident when the (11:05, 103) row arrived in the Shipments stream, taking the time past 11:00. If the process writing the Shipments stream had sent an 11:01 rowtime bound, the output record could have been emitted 4 minutes sooner.

Multi-way JOINs

To do a three way join, you use a joined table-reference as the table-reference in a JOIN statement. Here, stream/table 1 (b1) relates to stream/table 2 (asks) and stream/table 2 relates to stream/table 3 (b2), on the column "ticker."

select stream * from bids over (range interval '1' hour preceding) as b1       join asks over (range interval '2' second preceding)           on b1."ticker" = asks."ticker"       join bids over (range interval '3' minute preceding) as b2           on b2."ticker" = asks."ticker";