Publication - materialization process of block results into external database object (table, views).
Define publication
Configuration for publication is specified as a metadata properties.
#+src sql salesFact() #+meta { :publication { :type "table", :name "mart_sales_fact" } } #+begin ... #+end
When we run this block, table called “mart_sales_fact” will be created in the currently active database / schema.
CoginitiScript supports following publication types:
- table
- view
Referencing blocks with publication strategy
When we reference a block which has publication strategy defined, we will not execute block itself but instead read published data from the table / view / etc.
Given query
SELECT * FROM {{ salesFact() }};
Will be converted into the following SQL query before it is run:
SELECT * FROM mart_sales_fact;
Published data is refreshed only when blocks with publication strategy defined are explicitly executed and not when block is referenced from other blocks (in this case data is only read from the published data). User is responsible to configure pipeline to have all blocks that needs to be published executed first. Usually this could be done as a nightly jobs configured in our scheduler.
Publication could be also viewed as a caching mechanism to improve overall performance of the transformation pipelines where some blocks are “cached” by publishing data into tables and downstream blocks are read data from those tables instead of running dependent block logic over and over again.
Executing Publication
To execute publication programmatically CoginitiScript provides an embedded package from our standard library. Example below runs publications for individual blocks sales.SalesDetail
and sales.SalesHeader
.
#+import "std/publication" #+import "rn/sales" {{ publication.Run( blocks=[sales.SalesDetail, sales.SalesHeader] ) }}
All blocks specified as targets for execution within publication.Run
will be executing respecting their dependencies. For example, if sales.SalesDetail
block depends on sales.SalesHeader
block, the system will execute sales.SalesHeader
block first and only after that sales.SalesDetail
.
The order of execution is respected only within single publication.Run
call.
If you want your publication to be refreshed on a regular basis you can save given snippet as a catalog item and schedule execution of it using builtin scheduler in the Coginiti application.
publication.Run
in total accepts 3 parameters:
blocks
- list of block identifiers to publishpackages
- list of packages to publishpostOp
- identifier of block which should be executed as a post operation for each publication target
Below is more complex examples where we run publications for blocks sales.SalesDetail
, sales.SalesHeader
and all publications which system will find in packages location
and geography
. For each publication we also want to execute a post operation for granting SELECT permissions on a created target.
#+import "std/publication" #+import "rn/sales" #+import "rn/location" #+import "rn/geography" {{ publication.Run( blocks=[sales.SalesDetail, sales.SalesHeader], packages=[location, geography], postOp=grantSelectPermissions ) }} #+src sql grantSelectPermissions() #+begin GRANT SELECT ON {{ publication.Target() }} TO GROUP "allusers@coginiti.co"; #+end
:postOp
argument allows to specify script to be executed after each block publication. In the given example we use it to GRANT SELECT permissions on each target we publish. Please note, that in this script publication.Target()
builtin block is available for referencing target table for publication.
Coming soon
Incremental publication
Incremental publication allows users to publish only new data which is available since the moment when last time publication for the given block were run. To enable incremental publication user have to specify true
value for the :incremental
property. Within block body following builtin blocks could be used to control logic for incremental publication:
- publication.Incremental()
- Used as a predicate to add conditional logic to the SQL query to filter out rows based on some condition if incremental publication is enabled.
- publication.Target()
- Used as a reference to the target (published) dataset to query it and get needed values to compose a correct filter for incremental publication (for example, to get the latest timestamp in your target table).
#+import "std/publication"
#+src sql DAU() #+meta { :publication { :type "table", :name "daily_active_users", :incremental true } } #+begin SELECT DATE_TRUNC('day', visit_date) as date_day, COUNT(DISTINCT user_id) AS users_count FROM visits #+if publication.Incremental() then WHERE visit_date >= (SELECT MAX(date_day) FROM {{ publication.Target() }}) #+end GROUP BY date_day ; #+end
Unique key
If block has a unique key defined as part of its schema, then publication process will be using this information to match rows from the target with the incrementally published rows and updating existing rows that matches. For example, if you have a block which calculates daily active users and you want to publish it incrementally, make sure that you have date_day
column specified as your unique key. Otherwise, if you run this publication more than once per day you will be getting duplicate rows for the given date.
#+src sql DAU() #+meta { :publication { :type "table", :name "daily_active_users", :incremental true }, :schema { :unique_key ["date_day"] } } #+begin SELECT DATE_TRUNC('day', visit_date) as date_day, COUNT(DISTINCT user_id) AS users_count FROM visits #+if __INCREMENTAL__ then WHERE visit_date >= (SELECT MAX(date_day) FROM {{ __TARGET__ }}) #+end GROUP BY date_day ; #+end
Update logic for rows based on unique key works differently on different database platforms. Upsert
statements are used when supported, otherwise the combination of DELETE
and INSERT
statements are used which is executed inside a transaction to garantee atomicy of the operation.
Incremental publication is not supported for file targets, since it requires either a) querying target data, to get a proper value for filter condition or b) upsert operation to update rows based on unique key, which is really problematic for data presented as a raw file on a file system.