Publication (In Private Beta)

Publication - materialization process of block results into external database object (table, views).

 

Define publication

Configuration for publication is specified as a metadata properties.

#+src sql salesFact()
#+meta {
  :publication {
    :type "table",
    :name "mart_sales_fact"
  }
}
#+begin
 ...
#+end

When we run this block, table called “mart_sales_fact” will be created in the currently active database / schema.

CoginitiScript supports following publication types:

  • table
  • view
  • file on a filesystem (local filesystem, S3, etc)

 

Referencing blocks with publication strategy

When we reference a block which has publication strategy defined, we will not execute block itself but instead read published data from the table / view / etc.

Given query

SELECT * FROM ${salesFact()};

Will be converted into the following SQL query before it is run:

SELECT * FROM mart_sales_fact;

Published data is refreshed only when blocks with publication strategy defined are explicitly executed and not when block is referenced from other blocks (in this case data is only read from the published data). User is responsible to configure pipeline to have all blocks that needs to be published executed first. Usually this could be done as a nightly jobs configured in our scheduler.

Publication could be also viewed as a caching mechanism to improve overall performance of the transformation pipelines where some blocks are “cached” by publishing data into tables and downstream blocks are read data from those tables instead of running dependent block logic over and over again.

 

Incremental publication

Incremental publication allows users to publish only new data which is available since the moment when last time publication for the given block were run. To enable incremental publication user have to specify true value for the :incremental property. Within block body following builtin variables could be used to control logic for incremental publication:

__INCREMENTAL__
Used as a predicate to add conditional logic to the SQL query to filter out rows based on some condition if incremental publication is enabled.
__TARGET__
Used as a reference to the target (published) dataset to query it and get needed values to compose a correct filter for incremental publication (for example, to get the latest timestamp in your target table).
#+src sql DAU()
#+meta {
  :publication {
    :type "table",
    :name "daily_active_users",
    :incremental true
  }
}
#+begin
  SELECT
    DATE_TRUNC('day', visit_date) as date_day,
    COUNT(DISTINCT user_id) AS users_count
  FROM
    visits
  #+if __INCREMENTAL__ then
    WHERE visit_date >= (SELECT MAX(date_day) FROM {{ __TARGET__ }})
  #+end
  GROUP BY
    date_day
  ;
#+end

 

Unique key

If block has a unique key defined as part of its schema, then publication process will be using this information to match rows from the target with the incrementally published rows and updating existing rows that matches. For example, if you have a block which calculates daily active users and you want to publish it incrementally, make sure that you have date_day column specified as your unique key. Otherwise, if you run this publication more than once per day you will be getting duplicate rows for the given date.

#+src sql DAU()
#+meta {
  :publication {
    :type "table",
    :name "daily_active_users",
    :incremental true
  },
  :schema {
    :unique_key ["date_day"]
  }
}
#+begin
  SELECT
    DATE_TRUNC('day', visit_date) as date_day,
    COUNT(DISTINCT user_id) AS users_count
  FROM
    visits
  #+if __INCREMENTAL__ then
    WHERE visit_date >= (SELECT MAX(date_day) FROM {{ __TARGET__ }})
  #+end
  GROUP BY
    date_day
  ;
#+end

Update logic for rows based on unique key works differently on different database platforms. Upsert statements are used when supported, otherwise the combination of DELETE and INSERT statements are used which is executed inside a transaction to garantee atomicy of the operation.

Incremental publication is not supported for file targets, since it requires either a) querying target data, to get a proper value for filter condition or b) upsert operation to update rows based on unique key, which is really problematic for data presented as a raw file on a file system.

 

Use tags to simplify pipeline execution

Tags allow you to easily refer to a collection of blocks. This can be useful in creating various groups of block which needs to be executed together as a scheduling or triggering runs.

Example:

#+src sql salesFact()
#+meta {
  :publication {
    :type "table",
    :name "mart_sales_fact"
  },
  :tags ["publication"]
}
#+begin
 ...
#+end

Once you have assigned tags to blocks, you can use the tags to run blocks with a tags.

Was this article helpful?
0 out of 0 found this helpful
Have more questions? Submit a request