Publication - materialization process of block results into external database object (table, views).
Define publication
Configuration for publication is specified as a metadata properties.
#+src sql salesFact() #+meta { :publication { :type "table", :name "mart_sales_fact" } } #+begin ... #+end
When we run this block, table called “mart_sales_fact” will be created in the currently active database / schema.
CoginitiScript supports following publication types:
- table
- view
- file on a filesystem (local filesystem, S3, etc)
Referencing blocks with publication strategy
When we reference a block which has publication strategy defined, we will not execute block itself but instead read published data from the table / view / etc.
Given query
SELECT * FROM ${salesFact()};
Will be converted into the following SQL query before it is run:
SELECT * FROM mart_sales_fact;
Published data is refreshed only when blocks with publication strategy defined are explicitly executed and not when block is referenced from other blocks (in this case data is only read from the published data). User is responsible to configure pipeline to have all blocks that needs to be published executed first. Usually this could be done as a nightly jobs configured in our scheduler.
Publication could be also viewed as a caching mechanism to improve overall performance of the transformation pipelines where some blocks are “cached” by publishing data into tables and downstream blocks are read data from those tables instead of running dependent block logic over and over again.
Incremental publication
Incremental publication allows users to publish only new data which is available since the moment when last time publication for the given block were run. To enable incremental publication user have to specify true
value for the :incremental
property. Within block body following builtin variables could be used to control logic for incremental publication:
- __INCREMENTAL__
- Used as a predicate to add conditional logic to the SQL query to filter out rows based on some condition if incremental publication is enabled.
- __TARGET__
- Used as a reference to the target (published) dataset to query it and get needed values to compose a correct filter for incremental publication (for example, to get the latest timestamp in your target table).
#+src sql DAU() #+meta { :publication { :type "table", :name "daily_active_users", :incremental true } } #+begin SELECT DATE_TRUNC('day', visit_date) as date_day, COUNT(DISTINCT user_id) AS users_count FROM visits #+if __INCREMENTAL__ then WHERE visit_date >= (SELECT MAX(date_day) FROM {{ __TARGET__ }}) #+end GROUP BY date_day ; #+end
Unique key
If block has a unique key defined as part of its schema, then publication process will be using this information to match rows from the target with the incrementally published rows and updating existing rows that matches. For example, if you have a block which calculates daily active users and you want to publish it incrementally, make sure that you have date_day
column specified as your unique key. Otherwise, if you run this publication more than once per day you will be getting duplicate rows for the given date.
#+src sql DAU() #+meta { :publication { :type "table", :name "daily_active_users", :incremental true }, :schema { :unique_key ["date_day"] } } #+begin SELECT DATE_TRUNC('day', visit_date) as date_day, COUNT(DISTINCT user_id) AS users_count FROM visits #+if __INCREMENTAL__ then WHERE visit_date >= (SELECT MAX(date_day) FROM {{ __TARGET__ }}) #+end GROUP BY date_day ; #+end
Update logic for rows based on unique key works differently on different database platforms. Upsert
statements are used when supported, otherwise the combination of DELETE
and INSERT
statements are used which is executed inside a transaction to garantee atomicy of the operation.
Incremental publication is not supported for file targets, since it requires either a) querying target data, to get a proper value for filter condition or b) upsert operation to update rows based on unique key, which is really problematic for data presented as a raw file on a file system.
Use tags to simplify pipeline execution
Tags allow you to easily refer to a collection of blocks. This can be useful in creating various groups of block which needs to be executed together as a scheduling or triggering runs.
Example:
#+src sql salesFact() #+meta { :publication { :type "table", :name "mart_sales_fact" }, :tags ["publication"] } #+begin ... #+end
Once you have assigned tags to blocks, you can use the tags to run blocks with a tags.