Publication

Publication - is the process of materializing block results as an external database object (table, views) or data file (parquet, csv).

 

Define publication

Configuration for publication is specified as a metadata properties on the block. Publishing to the database requires two metadata properties, a type definition and name for the resulting object.

Publish to Table

#+src sql salesFact()
#+meta {
  :publication {
    :type "table",
    :name "mart_sales_fact"
  }
}
#+begin
 ...
#+end

When we run this block, a table called “mart_sales_fact” will be created in the currently active database / schema.

Publishing to file requires two metadata properties, a type definition and path, and supports a number of configuration options.

Note: running publication to file interactively in Coginiti Team will return the results to the browser

Publish to CSV

#+src sql salesFact()
#+meta {
  :publication {
    :type "csv",
    :path "/path/to/file_name", -- mandatory
:connection "Object Store Connection Name", -- optional, default is local :options { -- optional :delimiter ",", :null_value "", :quote_char "", :overwrite false, :header false, :encoding "UTF-8", :compression "NONE" } } } #+begin ... #+end

Be aware that on Windows folders inside path is separated using a backslash (\), which is used as an escape symbol inside regular string. CoginitiScript would try to interpret any escape sequences and will fail with correspondent error. The best way to avoid this is to use raw string instead of a regular string. Raw strings are enclosed in back-ticks `. Here, escape symbols, like \t and \n (or any other sequences) has no special meaning, they are considered as backslash with t and backslash with n (or any other character).

Example:

:path `C:\Projects\Reports\sales_report.csv`

 

If you still want to use regular string to define path on Windows, you have to escape backslash with another backslash.

:path "C:\\Projects\\Reports\\sales_report.csv" 

 

Publish to Parquet

#+src sql salesFact()
#+meta {
  :publication {
    :type "parquet"
    :path "/path/to/file_name",
:connection "Object Store Connection Name", -- optional, default is local :options { :row_group_size 1024, -- optional, default is 134217728 bytes (128 MiB) :page_size 1024, -- optional, default is 1048576 bytes (1 MiB) :overwrite false, :compression "NONE" -- snappy, gzip, none } } } #+begin ... #+end

 

Referencing blocks with a database publication strategy

When we reference a block which has database publication strategy defined, we will not execute block itself but instead read published data from the table or view.

Given a query

SELECT * FROM {{ salesFact() }};

Will be converted into the following SQL query before it is run:

SELECT * FROM mart_sales_fact;

NOTE: Referencing blocks with publication to file (CSV / Parquet) is not supported. User will get a runtime error from CoginitiScript when doing this.

Published data is refreshed only when blocks with publication strategy defined are explicitly executed and not when block is referenced from other blocks (in this case data is only read from the published data). The user is responsible to configure pipeline to have all blocks that needs to be published executed first. Usually this could be done as a nightly job configured in our scheduler.

Publication to the database could be also viewed as a caching mechanism to improve overall performance of the transformation pipelines where some blocks are “cached” by publishing data into tables and downstream blocks read data from those tables instead of running dependent block logic over and over again.

Executing Publications

To execute publication programmatically CoginitiScript provides an embedded package from our standard library. The example below runs publications for individual blocks sales.SalesDetail and sales.SalesHeader.

#+import "std/publication"
#+import "rn/sales"

{{
  publication.Run(
    blocks=[sales.SalesDetail, sales.SalesHeader]
  )
}}

All blocks specified as targets for execution within publication.Run will be executing respecting their dependencies. For example, if sales.SalesDetail block depends on sales.SalesHeader block, the system will execute sales.SalesHeader block first and only after that sales.SalesDetail.

 

The order of execution is respected only within single publication.Run call.

 

If you want your publication to be refreshed on a regular basis you can save given snippet as a catalog item and schedule execution of it using builtin scheduler in the Coginiti application.

 

publication.Run in total accepts 3 parameters:

  1. blocks - list of block identifiers to publish
  2. packages - list of packages to publish
  3. postOp - identifier of block which should be executed as a post operation for each publication target (it should be defined as having void result)

Below is more complex examples where we run publications for blocks sales.SalesDetailsales.SalesHeader and all publications which system will find in packages location and geography. For each publication we also want to execute a post operation for granting SELECT permissions on a created target.

#+import "std/publication"
#+import "rn/sales"
#+import "rn/location"
#+import "rn/geography"

{{
  publication.Run(
    blocks=[sales.SalesDetail, sales.SalesHeader],
    packages=[location, geography],
    postOp=grantSelectPermissions
  )
}}

#+src sql grantSelectPermissions(): void
#+begin
  GRANT SELECT ON {{ publication.Target() }} TO GROUP "allusers@coginiti.co";
#+end

:postOp argument allows to specify script to be executed after each block publication. In the given example we use it to GRANT SELECT permissions on each target we publish. Please note, that in this script publication.Target() builtin block is available for referencing target table for publication.

 

Applying postOp conditionally based on the publication target

When you publish a package where you have publications defined for both tables/views and csv/parquet files you might want to apply postOp for tables/views target but not for files. To do this, you can have a condition defined based on the publication.Type() builtin which returns the type of the given publication to which postOp is applied. publication.Type() returns the type of the publication: “table”, “view”, “csv”, “parquet”.

#+src sql grantSelectPermissions(): void
#+begin
  #+if publication.Type() == "table" || publication.Type() == "view" then
    GRANT SELECT ON {{ publication.Target() }} TO GROUP "allusers@coginiti.co";
  #+end
#+end

 

Incremental publication

Incremental publication allows users to publish only new data that is available since the moment when the last time publication for the given block was run. To enable incremental publication users have to specify:incremental property and set a type of incremental publication as its value. We support 3 different incremental strategies:

  • append
  • merge
  • merge_conditionally

Within the block body following builtin blocks could be used to control logic for incremental publication:

publication.Incremental()
Used as a predicate to add conditional logic to the SQL query to filter out rows based on some condition if incremental publication is enabled.
publication.Target()
Used as a reference to the target (published) dataset to query it and get needed values to compose a correct filter for incremental publication (for example, to get the latest timestamp in your target table).

Append publication

With this type, the rows returned by the publication block get appended to the target table.

#+import "std/publication"

#+src
sql DAU() #+meta { :publication { :type "table", :name "daily_active_users", :incremental "append" } } #+begin SELECT DATE_TRUNC('day', visit_date) as date_day, COUNT(DISTINCT user_id) AS users_count FROM visits #+if publication.Incremental() then WHERE visit_date >= (SELECT MAX(date_day) FROM {{ publication.Target() }}) #+end GROUP BY date_day ; #+end

 

 

 

Merge publication

Merge incremental publication should be used to update the target with new and updated rows based on the defined unique key. In the given example, we have a block that calculates daily active users and publishes them incrementally. The date_day column is specified as your unique key and incremental strategy are set to "merge". This will let us insert new data and also update it for already published dates if the incremental batch contains updated values.

#+src sql DAU()
#+meta {
  :publication {
    :type "table",
    :name "daily_active_users",
    :incremental "merge",
:unique_key ["date_day"] } } #+begin SELECT DATE_TRUNC('day', visit_date) as date_day, COUNT(DISTINCT user_id) AS users_count FROM visits #+if publication.Incremental() then WHERE visit_date >= (SELECT MAX(date_day) FROM {{ publication.Target() }}) #+end GROUP BY date_day ; #+end

Update logic for rows based on unique key works differently on different database platforms. MERGE statements are used when supported, otherwise, the combination of DELETE and INSERT statements are used which is executed inside a transaction to guarantee the atomicity of the operation.

Incremental publication is not supported for file targets, since it requires either a) querying target data, to get a proper value for filter condition or b) merge operation to update rows based on a unique key, which is problematic for data presented as a raw file on a file system.

 

Merge with additional conditions

Sometimes when doing a merge operation we want to update existing rows only when there are changes in the given column values. "merge_conditionally" incremental strategy allows us to do it. When selecting this strategy user is forced to define "unique_key" and "update_on_changes_in" metadata properties. Unique key is used to match rows in the target and "update_on_changes_in" contains a list of columns that should be used for comparison between incremental batch and target. The matched row in the target will be updated only if there are changes found in the "first_name", "last_name", or "email" columns.

#+src sql Persona()
#+meta {
:publication {
:type "table",
:name "dim_persona",
:schema "base",
:incremental "merge_conditionally",
:unique_key ["persona_key"],
:update_on_changes_in ["first_name", "last_name", "email"]
}
}
#+begin
SELECT
{{ SurrogateKey("src.customer_id") }} AS persona_key,
src.first_name,
src.last_name,
src.email,
b.batch_number
FROM
{{ sources.Customer() }} src
CROSS JOIN {{ BatchNumber() }} b
;
#+end
Was this article helpful?
0 out of 0 found this helpful
Have more questions? Submit a request