Examples

Importing FunSQL

FunSQL does not export any symbols by default. The following statement imports all available query constructors and the function render.

using FunSQL:
    FunSQL, Agg, Append, As, Asc, Bind, CrossJoin, Define, Desc, Fun, From,
    Get, Group, Highlight, Iterate, Join, LeftJoin, Limit, Lit, Order,
    Partition, Select, Sort, Var, Where, With, WithExternal, render

Establishing a database connection

We use FunSQL to assemble SQL queries. To actually run these queries, we need a regular database library such as SQLite.jl, LibPQ.jl, MySQL.jl, or ODBC.jl.

In the following examples, we use a SQLite database containing a tiny sample of the CMS DE-SynPuf dataset. See the Usage Guide for the description of the database schema.

Download the database file.

const URL = "https://github.com/MechanicalRabbit/ohdsi-synpuf-demo/releases/download/20210412/synpuf-10p.sqlite"
const DATABASE = download(URL)

Download the database file as an artifact.

using Pkg.Artifacts, LazyArtifacts

const DATABASE = joinpath(artifact"synpuf-10p", "synpuf-10p.sqlite")
#-> ⋮

Create a connection object.

using SQLite

const conn = DBInterface.connect(FunSQL.DB{SQLite.DB}, DATABASE)

Database connection with LibPQ.jl

To create a connection object, FunSQL relies on the DBInterface.jl package. Unfortunately LibPQ.jl, the PostgreSQL client library, does not support DBInterface. To make DBInterface.connect work, we need to manually bridge LibPQ and DBInterface.

using LibPQ
using DBInterface

DBInterface.connect(::Type{LibPQ.Connection}, args...; kws...) =
    LibPQ.Connection(args...; kws...)

DBInterface.prepare(conn::LibPQ.Connection, args...; kws...) =
    LibPQ.prepare(conn, args...; kws...)

DBInterface.execute(conn::Union{LibPQ.Connection, LibPQ.Statement}, args...; kws...) =
    LibPQ.execute(conn, args...; kws...)

Now we can create a FunSQL connection using DBInterface.connect.

const conn = DBInterface.connect(FunSQL.DB{LibPQ.Connection}, …)

SELECT * FROM table

FunSQL does not require that a query object contains Select, so a minimal FunSQL query consists of a single From node.

Show all patient records.

q = From(:person)

We use the function render to serialize the query node as a SQL statement.

sql = render(conn, q)

print(sql)
#=>
SELECT
  "person_1"."person_id",
  ⋮
  "person_1"."ethnicity_source_concept_id"
FROM "person" AS "person_1"
=#

This query could be executed with DBInterface.execute.

res = DBInterface.execute(conn, sql)

To display the output of a query, it is convenient to use the DataFrame interface.

using DataFrames

DataFrame(res)
#=>
10×18 DataFrame
 Row │ person_id  gender_concept_id  year_of_birth  month_of_birth  day_of_bir ⋯
     │ Int64      Int64              Int64          Int64           Int64      ⋯
─────┼──────────────────────────────────────────────────────────────────────────
   1 │      1780               8532           1940               2             ⋯
   2 │     30091               8532           1932               8
   3 │     37455               8532           1913               7
   4 │     42383               8507           1922               2
   5 │     69985               8532           1956               7             ⋯
   6 │     72120               8507           1937              10
   7 │     82328               8532           1957               9
   8 │     95538               8507           1923              11
   9 │    107680               8532           1963              12             ⋯
  10 │    110862               8507           1911               4
                                                              14 columns omitted
=#

We could also directly apply DBInterface.execute to the query node in order to render and immediately execute it.

DBInterface.execute(conn, q) |> DataFrame
#=>
10×18 DataFrame
⋮
=#

WHERE, ORDER, LIMIT

Tabular operations such as Where, Order, and Limit are available in FunSQL. Unlike SQL, FunSQL lets you apply them in any order.

Show the top 3 oldest male patients.

q = From(:person) |>
    Where(Get.gender_concept_id .== 8507) |>
    Order(Get.year_of_birth) |>
    Limit(3)

render(conn, q) |> print
#=>
SELECT
  "person_1"."person_id",
  ⋮
  "person_1"."ethnicity_source_concept_id"
FROM "person" AS "person_1"
WHERE ("person_1"."gender_concept_id" = 8507)
ORDER BY "person_1"."year_of_birth"
LIMIT 3
=#

DBInterface.execute(conn, q) |> DataFrame
#=>
3×18 DataFrame
 Row │ person_id  gender_concept_id  year_of_birth  month_of_birth  day_of_bir ⋯
     │ Int64      Int64              Int64          Int64           Int64      ⋯
─────┼──────────────────────────────────────────────────────────────────────────
   1 │    110862               8507           1911               4             ⋯
   2 │     42383               8507           1922               2
   3 │     95538               8507           1923              11
                                                              14 columns omitted
=#

Show all males among the top 3 oldest patients.

q = From(:person) |>
    Order(Get.year_of_birth) |>
    Limit(3) |>
    Where(Get.gender_concept_id .== 8507)

render(conn, q) |> print
#=>
SELECT
  "person_2"."person_id",
  ⋮
  "person_2"."ethnicity_source_concept_id"
FROM (
  SELECT
    "person_1"."person_id",
    ⋮
    "person_1"."ethnicity_source_concept_id"
  FROM "person" AS "person_1"
  ORDER BY "person_1"."year_of_birth"
  LIMIT 3
) AS "person_2"
WHERE ("person_2"."gender_concept_id" = 8507)
=#

DBInterface.execute(conn, q) |> DataFrame
#=>
2×18 DataFrame
 Row │ person_id  gender_concept_id  year_of_birth  month_of_birth  day_of_bir ⋯
     │ Int64      Int64              Int64          Int64           Int64      ⋯
─────┼──────────────────────────────────────────────────────────────────────────
   1 │    110862               8507           1911               4             ⋯
   2 │     42383               8507           1922               2
                                                              14 columns omitted
=#

SELECT COUNT(*) FROM table

To calculate an aggregate value for the whole dataset, we apply a Group node without arguments.

Show the number of patient records.

q = From(:person) |>
    Group() |>
    Select(Agg.count())

render(conn, q) |> print
#=>
SELECT count(*) AS "count"
FROM "person" AS "person_1"
=#

DBInterface.execute(conn, q) |> DataFrame
#=>
1×1 DataFrame
 Row │ count
     │ Int64
─────┼───────
   1 │    10
=#

SELECT DISTINCT

If we use a Group node, but do not apply any aggregate functions, FunSQL will render it as a SELECT DISTINCT clause.

Show all US states present in the location records.

q = From(:location) |>
    Group(Get.state)

render(conn, q) |> print
#=>
SELECT DISTINCT "location_1"."state"
FROM "location" AS "location_1"
=#

DBInterface.execute(conn, q) |> DataFrame
#=>
10×1 DataFrame
 Row │ state
     │ String
─────┼────────
   1 │ MI
   2 │ WA
   3 │ FL
   4 │ MD
   5 │ NY
   6 │ MS
   7 │ CO
   8 │ GA
   9 │ MA
  10 │ IL
=#

Generating a complex CASE clause

Show the number of patients stratified by the age group.

In this query, we need to place a person's age into one of the age buckets: 0 – 4, 5 – 9, 10 – 14, …, 95 – 99, 100 +. This is a tedious expression to write in raw SQL, but it could be written very compactly in FunSQL by using array comprehension to build the CASE expression.

using Dates

PersonAgeAt(date) =
    Fun.strftime("%Y", date) .- Get.year_of_birth

AgeGroup(age) =
    Fun.case(Iterators.flatten([(age .< y, "$(y-5) - $(y-1)")
                                for y = 5:5:100])...,
             "≥ 100")

q = From(:person) |>
    Group(:age_group => AgeGroup(PersonAgeAt(Date("2020-01-01")))) |>
    Order(Get.age_group) |>
    Select(Get.age_group, Agg.count())

render(conn, q) |> print
#=>
SELECT
  "person_2"."age_group",
  count(*) AS "count"
FROM (
  SELECT (CASE WHEN ((strftime('%Y', '2020-01-01') - "person_1"."year_of_birth") < 5) THEN '0 - 4' … ELSE '≥ 100' END) AS "age_group"
  FROM "person" AS "person_1"
) AS "person_2"
GROUP BY "person_2"."age_group"
ORDER BY "person_2"."age_group"
=#

DBInterface.execute(conn, q) |> DataFrame
#=>
6×2 DataFrame
 Row │ age_group  count
     │ String     Int64
─────┼──────────────────
   1 │ 55 - 59        1
   2 │ 60 - 64        2
   3 │ 80 - 84        2
   4 │ 85 - 89        1
   5 │ 95 - 99        2
   6 │ ≥ 100          2
=#

Filtering output columns

By default, the From node outputs all columns of a table, but we could restrict or change the list of output columns using Select. Typically, we would directly pass the definitions of output columns as individual arguments of Select, but occasionally it is convenient to generate the definitions programmatically.

Filter out all "source" columns from patient records.

const person_table = conn.catalog[:person]

is_not_source_column(c::Symbol) =
    !contains(String(c), "source")

q = From(:person) |>
    Select(args = [Get(c) for c in keys(person_table.columns) if is_not_source_column(c)])

display(q)
#=>
let q1 = From(:person),
    q2 = q1 |>
         Select(Get.person_id,
                Get.gender_concept_id,
                Get.year_of_birth,
                Get.month_of_birth,
                Get.day_of_birth,
                Get.time_of_birth,
                Get.race_concept_id,
                Get.ethnicity_concept_id,
                Get.location_id,
                Get.provider_id,
                Get.care_site_id)
    q2
end
=#

render(conn, q) |> print
#=>
SELECT
  "person_1"."person_id",
  "person_1"."gender_concept_id",
  "person_1"."year_of_birth",
  "person_1"."month_of_birth",
  "person_1"."day_of_birth",
  "person_1"."time_of_birth",
  "person_1"."race_concept_id",
  "person_1"."ethnicity_concept_id",
  "person_1"."location_id",
  "person_1"."provider_id",
  "person_1"."care_site_id"
FROM "person" AS "person_1"
=#

DBInterface.execute(conn, q) |> DataFrame
#=>
10×11 DataFrame
 Row │ person_id  gender_concept_id  year_of_birth  month_of_birth  day_of_bir ⋯
     │ Int64      Int64              Int64          Int64           Int64      ⋯
─────┼──────────────────────────────────────────────────────────────────────────
   1 │      1780               8532           1940               2             ⋯
   2 │     30091               8532           1932               8
   3 │     37455               8532           1913               7
   4 │     42383               8507           1922               2
   5 │     69985               8532           1956               7             ⋯
   6 │     72120               8507           1937              10
   7 │     82328               8532           1957               9
   8 │     95538               8507           1923              11
   9 │    107680               8532           1963              12             ⋯
  10 │    110862               8507           1911               4
                                                               7 columns omitted
=#

Output columns of a Join

As is often used to disambiguate the columns of the two input branches of the Join node. By default, columns fenced by As are not present in the output.

q = From(:person) |>
    Join(From(:visit_occurrence) |> As(:visit),
         on = Get.person_id .== Get.visit.person_id)

render(conn, q) |> print
#=>
SELECT
  "person_1"."person_id",
  ⋮
  "person_1"."ethnicity_source_concept_id"
FROM "person" AS "person_1"
JOIN "visit_occurrence" AS "visit_occurrence_1" ON ("person_1"."person_id" = "visit_occurrence_1"."person_id")
=#

q′ = From(:person) |> As(:person) |>
     Join(From(:visit_occurrence),
          on = Get.person.person_id .== Get.person_id)

render(conn, q′) |> print
#=>
SELECT
  "visit_occurrence_1"."visit_occurrence_id",
  ⋮
  "visit_occurrence_1"."visit_source_concept_id"
FROM "person" AS "person_1"
JOIN "visit_occurrence" AS "visit_occurrence_1" ON ("person_1"."person_id" = "visit_occurrence_1"."person_id")
=#

We could use a Select node to output the columns of both branches, however we must ensure that all column names are unique.

const visit_occurrence_table = conn.catalog[:visit_occurrence]

q = q |>
    Select(Get.(keys(person_table.columns))...,
           Get.(keys(visit_occurrence_table.columns), over = Get.visit)...)
#=>
ERROR: FunSQL.DuplicateLabelError: `person_id` is used more than once in:
⋮
=#

q = q |>
    Select(Get.(keys(person_table.columns))...,
           Get.(filter(!in(keys(person_table.columns)), collect(keys(visit_occurrence_table.columns))),
                over = Get.visit)...)

render(conn, q) |> print
#=>
SELECT
  "person_1"."person_id",
  ⋮
  "person_1"."ethnicity_source_concept_id",
  "visit_occurrence_1"."visit_occurrence_id",
  ⋮
  "visit_occurrence_1"."visit_source_concept_id"
FROM "person" AS "person_1"
JOIN "visit_occurrence" AS "visit_occurrence_1" ON ("person_1"."person_id" = "visit_occurrence_1"."person_id")
=#

Querying concepts

Medical terms, such as Inpatient (visit) or Myocardial infarction (condition), are stored in the table concept. Concepts are typically identified by the vocabulary and the code within the vocabulary. For example, Myocardial infarction has a code 22298006 in the SNOMED CT vocabulary.

Concept may be related to each other. For example, Acute myocardial infarction is a subtype of Myocardial infarction. Relationships between concepts are stored in the table concept_relationship with the column relationship_id specifying the type of the relationship.

Querying healthcare information often starts with identifying the set of relevant concepts. For example, a researcher may want to specify a concept set containing

  • Myocardial infarction (SNOMED 22298006);
  • And all the subtypes;
  • But excluding Acute subendocardial infarction (SNOMED 70422006) and its subtypes.

This suggests us to make a FunSQL-based mini-language for querying concept sets. This language will include primitives for fetching concepts by name, or by vocabulary and code, operations for adding related concepts, and combining and excluding concept sets. These operations could be expressed directly in terms of FunSQL queries.

We start with a primitive for finding a concept by its code in the vocabulary.

ConceptByCode(vocabulary, code) =
    From(:concept) |>
    Where(Fun.and(Get.vocabulary_id .== vocabulary,
                  Get.concept_code .== code))

ConceptByCode(vocabulary, codes...) =
    From(:concept) |>
    Where(Fun.and(Get.vocabulary_id .== vocabulary,
                  Fun.in(Get.concept_code, codes...)))

It is convenient to add a shortcut for common vocabularies.

SNOMED(codes...) =
    ConceptByCode("SNOMED", codes...)

VISIT(codes...) =
    ConceptByCode("Visit", codes...)

Now we can define

q = SNOMED("22298006")          # Myocardial infarction

DBInterface.execute(conn, q) |> DataFrame
#=>
1×10 DataFrame
 Row │ concept_id  concept_name           domain_id  vocabulary_id  concept_cl ⋯
     │ Int64       String                 String     String         String     ⋯
─────┼──────────────────────────────────────────────────────────────────────────
   1 │    4329847  Myocardial infarction  Condition  SNOMED         Clinical F ⋯
                                                               6 columns omitted
=#

The following composite query pipeline can be applied to a set of concepts to determine their immediate subtypes.

ImmediateSubtypes() =
    As(:base) |>
    Join(From(:concept_relationship) |>
         Where(Get.relationship_id .== "Is a") |>
         As(:concept_relationship),
         on = Get.base.concept_id .== Get.concept_relationship.concept_id_2) |>
    Join(From(:concept),
         on = Get.concept_relationship.concept_id_1 .== Get.concept_id)

q = SNOMED("22298006") |>       # Myocardial infarction
    ImmediateSubtypes()

DBInterface.execute(conn, q) |> DataFrame
#=>
1×10 DataFrame
 Row │ concept_id  concept_name                 domain_id  vocabulary_id  conc ⋯
     │ Int64       String                       String     String         Stri ⋯
─────┼──────────────────────────────────────────────────────────────────────────
   1 │     312327  Acute myocardial infarction  Condition  SNOMED         Clin ⋯
                                                               6 columns omitted
=#

Recursively applying ImmediateSubtypes with Iterate gives us the concept set together will all subtypes.

WithSubtypes() =
    Iterate(ImmediateSubtypes())

q = SNOMED("22298006") |>       # Myocardial infarction
    WithSubtypes()

DBInterface.execute(conn, q) |> DataFrame
#=>
6×10 DataFrame
 Row │ concept_id  concept_name                       domain_id  vocabulary_id ⋯
     │ Int64       String                             String     String        ⋯
─────┼──────────────────────────────────────────────────────────────────────────
   1 │    4329847  Myocardial infarction              Condition  SNOMED        ⋯
   2 │     312327  Acute myocardial infarction        Condition  SNOMED
   3 │     434376  Acute myocardial infarction of a…  Condition  SNOMED
   4 │     438170  Acute myocardial infarction of i…  Condition  SNOMED
   5 │     438438  Acute myocardial infarction of a…  Condition  SNOMED        ⋯
   6 │     444406  Acute subendocardial infarction    Condition  SNOMED
                                                               6 columns omitted
=#

Finally, we add operations on a concept set for adding or removing concepts.

IncludingConcepts(include) =
    Append(include)

ExcludingConcepts(exclude) =
    LeftJoin(:exclude => exclude,
             Get.concept_id .== Get.exclude.concept_id) |>
    Where(Fun.is_null(Get.exclude.concept_id))

q = SNOMED("22298006") |>       # Myocardial infarction
    WithSubtypes() |>
    ExcludingConcepts(
        SNOMED("70422006") |>   # Acute subendocardial infarction
        WithSubtypes())

DBInterface.execute(conn, q) |> DataFrame
#=>
5×10 DataFrame
 Row │ concept_id  concept_name                       domain_id  vocabulary_id ⋯
     │ Int64       String                             String     String        ⋯
─────┼──────────────────────────────────────────────────────────────────────────
   1 │    4329847  Myocardial infarction              Condition  SNOMED        ⋯
   2 │     312327  Acute myocardial infarction        Condition  SNOMED
   3 │     434376  Acute myocardial infarction of a…  Condition  SNOMED
   4 │     438170  Acute myocardial infarction of i…  Condition  SNOMED
   5 │     438438  Acute myocardial infarction of a…  Condition  SNOMED        ⋯
                                                               6 columns omitted
=#

Given a concept set, it is now easy to find the matching clinical conditions.

MyocardialInfarctionConcepts() =
    SNOMED("22298006") |>       # Myocardial infarction
    WithSubtypes() |>
    ExcludingConcepts(
        SNOMED("70422006") |>   # Acute subendocardial infarction
        WithSubtypes())

q = From(:condition_occurrence) |>
    Join(MyocardialInfarctionConcepts(),
         Get.condition_concept_id .== Get.concept_id) |>
    Order(Get.condition_occurrence_id) |>
    Select(Get.person_id, Get.condition_start_date)

DBInterface.execute(conn, q) |> DataFrame
#=>
6×2 DataFrame
 Row │ person_id  condition_start_date
     │ Int64      String
─────┼─────────────────────────────────
   1 │      1780  2008-04-10
   2 │     37455  2010-08-12
   3 │     69985  2010-05-06
   4 │    110862  2008-09-07
   5 │    110862  2008-09-07
   6 │    110862  2010-06-07
=#

This notation is much more compact and readable than the corresponding SQL query.

render(conn, q) |> print
#=>
WITH RECURSIVE "base_1" ("concept_id") AS (
  SELECT "concept_1"."concept_id"
  FROM "concept" AS "concept_1"
  WHERE
    ("concept_1"."vocabulary_id" = 'SNOMED') AND
    ("concept_1"."concept_code" = '22298006')
  UNION ALL
  SELECT "concept_2"."concept_id"
  FROM "base_1" AS "base_2"
  JOIN (
    SELECT
      "concept_relationship_1"."concept_id_1",
      "concept_relationship_1"."concept_id_2"
    FROM "concept_relationship" AS "concept_relationship_1"
    WHERE ("concept_relationship_1"."relationship_id" = 'Is a')
  ) AS "concept_relationship_2" ON ("base_2"."concept_id" = "concept_relationship_2"."concept_id_2")
  JOIN "concept" AS "concept_2" ON ("concept_relationship_2"."concept_id_1" = "concept_2"."concept_id")
),
"base_4" ("concept_id") AS (
  SELECT "concept_3"."concept_id"
  FROM "concept" AS "concept_3"
  WHERE
    ("concept_3"."vocabulary_id" = 'SNOMED') AND
    ("concept_3"."concept_code" = '70422006')
  UNION ALL
  SELECT "concept_4"."concept_id"
  FROM "base_4" AS "base_5"
  JOIN (
    SELECT
      "concept_relationship_3"."concept_id_1",
      "concept_relationship_3"."concept_id_2"
    FROM "concept_relationship" AS "concept_relationship_3"
    WHERE ("concept_relationship_3"."relationship_id" = 'Is a')
  ) AS "concept_relationship_4" ON ("base_5"."concept_id" = "concept_relationship_4"."concept_id_2")
  JOIN "concept" AS "concept_4" ON ("concept_relationship_4"."concept_id_1" = "concept_4"."concept_id")
)
SELECT
  "condition_occurrence_1"."person_id",
  "condition_occurrence_1"."condition_start_date"
FROM "condition_occurrence" AS "condition_occurrence_1"
JOIN (
  SELECT "base_3"."concept_id"
  FROM "base_1" AS "base_3"
  LEFT JOIN "base_4" AS "base_6" ON ("base_3"."concept_id" = "base_6"."concept_id")
  WHERE ("base_6"."concept_id" IS NULL)
) AS "base_7" ON ("condition_occurrence_1"."condition_concept_id" = "base_7"."concept_id")
ORDER BY "condition_occurrence_1"."condition_occurrence_id"
=#

Assembling queries incrementally

It is often convenient to build a query incrementally, one component at a time. This allows us to validate individual components, inspect their output, and possibly reuse them in other queries.

Find all occurrences of myocardial infarction that was diagnosed during an inpatient visit. Filter out repeating occurrences by requiring a 180-day gap between consecutive events.

We start with generating two datasets: inpatient visits and myocardial infarction conditions. For constructing the concepts Inpatient Visit and Myocardial Infarction, we use the definitions from the section Querying concepts:

MyocardialInfarctionConcept() =
    SNOMED("22298006") |>
    WithSubtypes()

DBInterface.execute(conn, MyocardialInfarctionConcept()) |> DataFrame
#=>
6×10 DataFrame
 Row │ concept_id  concept_name                       domain_id  vocabulary_id ⋯
     │ Int64       String                             String     String        ⋯
─────┼──────────────────────────────────────────────────────────────────────────
   1 │    4329847  Myocardial infarction              Condition  SNOMED        ⋯
   2 │     312327  Acute myocardial infarction        Condition  SNOMED
   3 │     434376  Acute myocardial infarction of a…  Condition  SNOMED
   4 │     438170  Acute myocardial infarction of i…  Condition  SNOMED
   5 │     438438  Acute myocardial infarction of a…  Condition  SNOMED        ⋯
   6 │     444406  Acute subendocardial infarction    Condition  SNOMED
                                                               6 columns omitted
=#

MyocardialInfarctionOccurrence() =
    From(:condition_occurrence) |>
    Join(:concept => MyocardialInfarctionConcept(),
         on = Get.condition_concept_id .== Get.concept.concept_id) |>
    Order(Get.condition_occurrence_id)

DBInterface.execute(conn, MyocardialInfarctionOccurrence()) |> DataFrame
#=>
11×11 DataFrame
 Row │ condition_occurrence_id  person_id  condition_concept_id  condition_sta ⋯
     │ Int64                    Int64      Int64                 String        ⋯
─────┼──────────────────────────────────────────────────────────────────────────
   1 │                  228161       1780                312327  2008-04-10    ⋯
   2 │                 3767773      30091                444406  2009-08-02
   3 │                 4696273      37455                438438  2010-08-12
   4 │                 8701359      69985                444406  2010-07-22
   5 │                 8701405      69985                312327  2010-05-06    ⋯
   6 │                11881327      95538                444406  2009-03-30
   7 │                13374905     107680                444406  2009-07-20
   8 │                13769162     110862                444406  2009-09-30
   9 │                13769189     110862                438170  2008-09-07    ⋯
  10 │                13769190     110862                434376  2008-09-07
  11 │                13769260     110862                312327  2010-06-07
                                                               8 columns omitted
=#

InpatientVisitConcept() =
    VISIT("IP") |>
    WithSubtypes()

DBInterface.execute(conn, InpatientVisitConcept()) |> DataFrame
#=>
2×10 DataFrame
 Row │ concept_id  concept_name        domain_id  vocabulary_id         concep ⋯
     │ Int64       String              String     String                String ⋯
─────┼──────────────────────────────────────────────────────────────────────────
   1 │       9201  Inpatient Visit     Visit      Visit                 Visit  ⋯
   2 │       8717  Inpatient Hospital  Visit      CMS Place of Service  Visit
                                                               6 columns omitted
=#

InpatientVisitOccurrence() =
    From(:visit_occurrence) |>
    Join(:concept => InpatientVisitConcept(),
         on = Get.visit_concept_id .== Get.concept.concept_id)

DBInterface.execute(conn, InpatientVisitOccurrence()) |> DataFrame
#=>
6×12 DataFrame
 Row │ visit_occurrence_id  person_id  visit_concept_id  visit_start_date  vis ⋯
     │ Int64                Int64      Int64             String            Mis ⋯
─────┼──────────────────────────────────────────────────────────────────────────
   1 │               88179       1780              9201  2008-04-09            ⋯
   2 │             1454883      30091              9201  2009-07-30
   3 │             3359790      69985              9201  2010-07-22
   4 │             4586628      95538              9201  2009-03-30
   5 │             5162803     107680              9201  2009-07-20            ⋯
   6 │             5314664     110862              9201  2009-09-30
                                                               8 columns omitted
=#

Using these two datasets, we need to find those conditions that occurred during one of the visits. We start with building a parameterized query that finds visits overlapping with a specified timestamp.

using Dates

CorrelatedInpatientVisit(person_id, date) =
    InpatientVisitOccurrence() |>
    Where(Fun.and(Get.person_id .== Var.PERSON_ID,
                  Fun.between(Var.DATE, Get.visit_start_date, Get.visit_end_date))) |>
    Bind(:PERSON_ID => person_id,
         :DATE => date)

q = CorrelatedInpatientVisit(1780, Date("2008-04-10"))

DBInterface.execute(conn, q) |> DataFrame
#=>
1×12 DataFrame
 Row │ visit_occurrence_id  person_id  visit_concept_id  visit_start_date  vis ⋯
     │ Int64                Int64      Int64             String            Mis ⋯
─────┼──────────────────────────────────────────────────────────────────────────
   1 │               88179       1780              9201  2008-04-09            ⋯
                                                               8 columns omitted
=#

We will use this query to correlate inpatient visits with the date of the diagnosed condition.

MyocardialInfarctionDuringInpatientVisit() =
    MyocardialInfarctionOccurrence() |>
    Where(Fun.exists(CorrelatedInpatientVisit(Get.person_id, Get.condition_start_date)))

DBInterface.execute(conn, MyocardialInfarctionDuringInpatientVisit()) |> DataFrame
#=>
6×11 DataFrame
 Row │ condition_occurrence_id  person_id  condition_concept_id  condition_sta ⋯
     │ Int64                    Int64      Int64                 String        ⋯
─────┼──────────────────────────────────────────────────────────────────────────
   1 │                  228161       1780                312327  2008-04-10    ⋯
   2 │                 3767773      30091                444406  2009-08-02
   3 │                 8701359      69985                444406  2010-07-22
   4 │                11881327      95538                444406  2009-03-30
   5 │                13374905     107680                444406  2009-07-20    ⋯
   6 │                13769162     110862                444406  2009-09-30
                                                               8 columns omitted
=#

Finally, we must exclude any events that occurred within 180 days from the previous event. For this purpose, we build a filtering pipeline:

using Dates

FilterByGap(date, gap) =
    Partition(Get.person_id, order_by = [date]) |>
    Define(:boundary => Agg.lag(Fun.date(date, gap))) |>
    Where(Fun.or(Fun.is_null(Get.boundary),
                 Get.boundary .< date))

To verify that this pipeline operates correctly, we could apply it to a synthetic dataset.

events = DataFrame([(person_id = 1, date = Date("2020-01-01")),    # ✓
                    (person_id = 1, date = Date("2020-02-01")),    # ✗
                    (person_id = 1, date = Date("2021-01-01")),    # ✓
                    (person_id = 1, date = Date("2021-05-01")),    # ✗
                    (person_id = 1, date = Date("2021-10-01")),    # ✗
                    (person_id = 2, date = Date("2020-01-01")),    # ✓
])
#=>
6×2 DataFrame
 Row │ person_id  date
     │ Int64      Date
─────┼───────────────────────
   1 │         1  2020-01-01
   2 │         1  2020-02-01
   3 │         1  2021-01-01
   4 │         1  2021-05-01
   5 │         1  2021-10-01
   6 │         2  2020-01-01
=#

q = From(events) |>
    FilterByGap(Get.date, Day(180))

DBInterface.execute(conn, q) |> DataFrame
#=>
3×3 DataFrame
 Row │ person_id  date        boundary
     │ Int64      String      String?
─────┼───────────────────────────────────
   1 │         1  2020-01-01  missing
   2 │         1  2021-01-01  2020-07-30
   3 │         2  2020-01-01  missing
=#

Now we have all the components to construct the final query:

FilteredMyocardialInfarctionDuringInpatientVisit() =
    MyocardialInfarctionDuringInpatientVisit() |>
    FilterByGap(Get.condition_start_date, Day(180))

q = FilteredMyocardialInfarctionDuringInpatientVisit() |>
    Select(Get.person_id, Get.condition_start_date)

DBInterface.execute(conn, q) |> DataFrame
#=>
6×2 DataFrame
 Row │ person_id  condition_start_date
     │ Int64      String
─────┼─────────────────────────────────
   1 │      1780  2008-04-10
   2 │     30091  2009-08-02
   3 │     69985  2010-07-22
   4 │     95538  2009-03-30
   5 │    107680  2009-07-20
   6 │    110862  2009-09-30
=#

render(conn, q) |> print
#=>
WITH RECURSIVE "base_1" ("concept_id") AS (
  SELECT "concept_1"."concept_id"
  FROM "concept" AS "concept_1"
  WHERE
    ("concept_1"."vocabulary_id" = 'SNOMED') AND
    ("concept_1"."concept_code" = '22298006')
  UNION ALL
  SELECT "concept_2"."concept_id"
  FROM "base_1" AS "base_2"
  JOIN (
    SELECT
      "concept_relationship_1"."concept_id_1",
      "concept_relationship_1"."concept_id_2"
    FROM "concept_relationship" AS "concept_relationship_1"
    WHERE ("concept_relationship_1"."relationship_id" = 'Is a')
  ) AS "concept_relationship_2" ON ("base_2"."concept_id" = "concept_relationship_2"."concept_id_2")
  JOIN "concept" AS "concept_2" ON ("concept_relationship_2"."concept_id_1" = "concept_2"."concept_id")
),
"base_4" ("concept_id") AS (
  SELECT "concept_3"."concept_id"
  FROM "concept" AS "concept_3"
  WHERE
    ("concept_3"."vocabulary_id" = 'Visit') AND
    ("concept_3"."concept_code" = 'IP')
  UNION ALL
  SELECT "concept_4"."concept_id"
  FROM "base_4" AS "base_5"
  JOIN (
    SELECT
      "concept_relationship_3"."concept_id_1",
      "concept_relationship_3"."concept_id_2"
    FROM "concept_relationship" AS "concept_relationship_3"
    WHERE ("concept_relationship_3"."relationship_id" = 'Is a')
  ) AS "concept_relationship_4" ON ("base_5"."concept_id" = "concept_relationship_4"."concept_id_2")
  JOIN "concept" AS "concept_4" ON ("concept_relationship_4"."concept_id_1" = "concept_4"."concept_id")
)
SELECT
  "condition_occurrence_3"."person_id",
  "condition_occurrence_3"."condition_start_date"
FROM (
  SELECT
    "condition_occurrence_2"."person_id",
    "condition_occurrence_2"."condition_start_date",
    (lag(date("condition_occurrence_2"."condition_start_date", '180 days')) OVER (PARTITION BY "condition_occurrence_2"."person_id" ORDER BY "condition_occurrence_2"."condition_start_date")) AS "boundary"
  FROM (
    SELECT
      "condition_occurrence_1"."person_id",
      "condition_occurrence_1"."condition_start_date"
    FROM "condition_occurrence" AS "condition_occurrence_1"
    JOIN "base_1" AS "base_3" ON ("condition_occurrence_1"."condition_concept_id" = "base_3"."concept_id")
    ORDER BY "condition_occurrence_1"."condition_occurrence_id"
  ) AS "condition_occurrence_2"
  WHERE (EXISTS (
    SELECT NULL AS "_"
    FROM "visit_occurrence" AS "visit_occurrence_1"
    JOIN "base_4" AS "base_6" ON ("visit_occurrence_1"."visit_concept_id" = "base_6"."concept_id")
    WHERE
      ("visit_occurrence_1"."person_id" = "condition_occurrence_2"."person_id") AND
      ("condition_occurrence_2"."condition_start_date" BETWEEN "visit_occurrence_1"."visit_start_date" AND "visit_occurrence_1"."visit_end_date")
  ))
) AS "condition_occurrence_3"
WHERE
  ("condition_occurrence_3"."boundary" IS NULL) OR
  ("condition_occurrence_3"."boundary" < "condition_occurrence_3"."condition_start_date")
=#

Merging overlapping intervals

Merging overlapping intervals into a single encompassing period could be done in three steps:

  1. Tag the intervals that start a new period.
  2. Enumerate the periods.
  3. Group the intervals by the period number.

FunSQL lets us encapsulate and reuse this rather complex sequence of transformations.

Merge overlapping visits.

MergeOverlappingIntervals(start_date, end_date) =
    Partition(Get.person_id,
              order_by = [start_date],
              frame = (mode = :rows, start = -Inf, finish = -1)) |>
    Define(:new => Fun.case(start_date .<= Agg.max(end_date), 0, 1)) |>
    Partition(Get.person_id,
              order_by = [start_date, .- Get.new],
              frame = :rows) |>
    Define(:period => Agg.sum(Get.new)) |>
    Group(Get.person_id, Get.period) |>
    Define(:start_date => Agg.min(start_date),
           :end_date => Agg.max(end_date))

q = From(:visit_occurrence) |>
    MergeOverlappingIntervals(Get.visit_start_date, Get.visit_end_date) |>
    Select(Get.person_id, Get.start_date, Get.end_date)

render(conn, q) |> print
#=>
SELECT
  "visit_occurrence_3"."person_id",
  min("visit_occurrence_3"."visit_start_date") AS "start_date",
  max("visit_occurrence_3"."visit_end_date") AS "end_date"
FROM (
  SELECT
    "visit_occurrence_2"."person_id",
    (sum("visit_occurrence_2"."new") OVER (PARTITION BY "visit_occurrence_2"."person_id" ORDER BY "visit_occurrence_2"."visit_start_date", (- "visit_occurrence_2"."new") ROWS UNBOUNDED PRECEDING)) AS "period",
    "visit_occurrence_2"."visit_start_date",
    "visit_occurrence_2"."visit_end_date"
  FROM (
    SELECT
      "visit_occurrence_1"."person_id",
      "visit_occurrence_1"."visit_start_date",
      "visit_occurrence_1"."visit_end_date",
      (CASE WHEN ("visit_occurrence_1"."visit_start_date" <= (max("visit_occurrence_1"."visit_end_date") OVER (PARTITION BY "visit_occurrence_1"."person_id" ORDER BY "visit_occurrence_1"."visit_start_date" ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING))) THEN 0 ELSE 1 END) AS "new"
    FROM "visit_occurrence" AS "visit_occurrence_1"
  ) AS "visit_occurrence_2"
) AS "visit_occurrence_3"
GROUP BY
  "visit_occurrence_3"."person_id",
  "visit_occurrence_3"."period"
=#

DBInterface.execute(conn, q) |> DataFrame
#=>
25×3 DataFrame
 Row │ person_id  start_date  end_date
     │ Int64      String      String
─────┼───────────────────────────────────
   1 │      1780  2008-04-09  2008-04-13
   2 │      1780  2008-11-22  2008-11-22
   3 │      1780  2009-05-22  2009-05-22
   4 │     30091  2008-11-12  2008-11-12
   5 │     30091  2009-07-30  2009-08-07
   6 │     37455  2008-03-18  2008-03-18
   7 │     37455  2008-10-30  2008-10-30
   8 │     37455  2010-08-12  2010-08-12
  ⋮  │     ⋮          ⋮           ⋮
  19 │     95538  2009-09-02  2009-09-02
  20 │    107680  2009-06-07  2009-06-07
  21 │    107680  2009-07-20  2009-07-30
  22 │    110862  2008-09-07  2008-09-16
  23 │    110862  2009-06-30  2009-06-30
  24 │    110862  2009-09-30  2009-10-01
  25 │    110862  2010-06-07  2010-06-07
                          10 rows omitted
=#

Derive a patient's observation periods by merging visits with less than one year gap between them.

MergeIntervalsByGap(start_date, end_date, gap) =
    MergeOverlappingIntervals(start_date, Fun.date(end_date, gap)) |>
    Define(:end_date => Fun.date(Get.end_date, -gap))

q = From(:visit_occurrence) |>
    MergeIntervalsByGap(Get.visit_start_date, Get.visit_end_date, Day(365)) |>
    Select(Get.person_id, Get.start_date, Get.end_date)

render(conn, q) |> print
#=>
SELECT
  "visit_occurrence_3"."person_id",
  min("visit_occurrence_3"."visit_start_date") AS "start_date",
  date(max(date("visit_occurrence_3"."visit_end_date", '365 days')), '-365 days') AS "end_date"
FROM (
  SELECT
    "visit_occurrence_2"."person_id",
    (sum("visit_occurrence_2"."new") OVER (PARTITION BY "visit_occurrence_2"."person_id" ORDER BY "visit_occurrence_2"."visit_start_date", (- "visit_occurrence_2"."new") ROWS UNBOUNDED PRECEDING)) AS "period",
    "visit_occurrence_2"."visit_start_date",
    "visit_occurrence_2"."visit_end_date"
  FROM (
    SELECT
      "visit_occurrence_1"."person_id",
      "visit_occurrence_1"."visit_start_date",
      "visit_occurrence_1"."visit_end_date",
      (CASE WHEN ("visit_occurrence_1"."visit_start_date" <= (max(date("visit_occurrence_1"."visit_end_date", '365 days')) OVER (PARTITION BY "visit_occurrence_1"."person_id" ORDER BY "visit_occurrence_1"."visit_start_date" ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING))) THEN 0 ELSE 1 END) AS "new"
    FROM "visit_occurrence" AS "visit_occurrence_1"
  ) AS "visit_occurrence_2"
) AS "visit_occurrence_3"
GROUP BY
  "visit_occurrence_3"."person_id",
  "visit_occurrence_3"."period"
=#

DBInterface.execute(conn, q) |> DataFrame
#=>
12×3 DataFrame
 Row │ person_id  start_date  end_date
     │ Int64      String      String
─────┼───────────────────────────────────
   1 │      1780  2008-04-09  2009-05-22
   2 │     30091  2008-11-12  2009-08-07
   3 │     37455  2008-03-18  2008-10-30
   4 │     37455  2010-08-12  2010-08-12
   5 │     42383  2009-06-29  2010-04-15
   6 │     69985  2009-01-09  2009-01-09
   7 │     69985  2010-04-17  2010-07-30
   8 │     72120  2008-12-15  2008-12-15
   9 │     82328  2008-10-20  2009-01-25
  10 │     95538  2009-03-30  2009-09-02
  11 │    107680  2009-06-07  2009-07-30
  12 │    110862  2008-09-07  2010-06-07
=#