Publication

Publication - materialization process of block results into external database object (table, views).

 

Define publication

Configuration for publication is specified as a metadata properties.

#+src sql salesFact()
#+meta {
  :publication {
    :type "table",
    :name "mart_sales_fact"
  }
}
#+begin
 ...
#+end

When we run this block, table called “mart_sales_fact” will be created in the currently active database / schema.

CoginitiScript supports following publication types:

  • table
  • view

 

Referencing blocks with publication strategy

When we reference a block which has publication strategy defined, we will not execute block itself but instead read published data from the table / view / etc.

Given query

SELECT * FROM {{ salesFact() }};

Will be converted into the following SQL query before it is run:

SELECT * FROM mart_sales_fact;

Published data is refreshed only when blocks with publication strategy defined are explicitly executed and not when block is referenced from other blocks (in this case data is only read from the published data). User is responsible to configure pipeline to have all blocks that needs to be published executed first. Usually this could be done as a nightly jobs configured in our scheduler.

Publication could be also viewed as a caching mechanism to improve overall performance of the transformation pipelines where some blocks are “cached” by publishing data into tables and downstream blocks are read data from those tables instead of running dependent block logic over and over again.

 

Executing Publication

To execute publication programmatically CoginitiScript provides an embedded package from our standard library. Example below runs publications for individual blocks sales.SalesDetail and sales.SalesHeader.

#+import "std/publication"
#+import "rn/sales"

{{
  publication.Run(
    blocks=[sales.SalesDetail, sales.SalesHeader]
  )
}}

All blocks specified as targets for execution within publication.Run will be executing respecting their dependencies. For example, if sales.SalesDetail block depends on sales.SalesHeader block, the system will execute sales.SalesHeader block first and only after that sales.SalesDetail.

 

The order of execution is respected only within single publication.Run call.

 

If you want your publication to be refreshed on a regular basis you can save given snippet as a catalog item and schedule execution of it using builtin scheduler in the Coginiti application.

 

Next functionality will be available in the next version of CoginitiScript.

 

publication.Run in total accepts 3 parameters:

  1. blocks - list of block identifiers to publish
  2. packages - list of packages to publish
  3. postOp - identifier of block which should be executed as a post operation for each publication target

Below is more complex examples where we run publications for blocks sales.SalesDetailsales.SalesHeader and all publications which system will find in packages location and geography. For each publication we also want to execute a post operation for granting SELECT permissions on a created target.

#+import "std/publication"
#+import "rn/sales"
#+import "rn/location"
#+import "rn/geography"

{{
  publication.Run(
    blocks=[sales.SalesDetail, sales.SalesHeader],
    packages=[location, geography],
    postOp=grantSelectPermissions
  )
}}

#+src sql grantSelectPermissions()
#+begin
  GRANT SELECT ON {{ publication.Target() }} TO GROUP "allusers@coginiti.co";
#+end

:postOp argument allows to specify script to be executed after each block publication. In the given example we use it to GRANT SELECT permissions on each target we publish. Please note, that in this script publication.Target() builtin block is available for referencing target table for publication.

 

Incremental publication

Incremental publication allows users to publish only new data which is available since the moment when last time publication for the given block were run. To enable incremental publication user have to specify true value for the :incremental property. Within block body following builtin blocks could be used to control logic for incremental publication:

publication.Incremental()
Used as a predicate to add conditional logic to the SQL query to filter out rows based on some condition if incremental publication is enabled.
publication.Target()
Used as a reference to the target (published) dataset to query it and get needed values to compose a correct filter for incremental publication (for example, to get the latest timestamp in your target table).
#+import "std/publication"

#+src
sql DAU() #+meta { :publication { :type "table", :name "daily_active_users", :incremental true } } #+begin SELECT DATE_TRUNC('day', visit_date) as date_day, COUNT(DISTINCT user_id) AS users_count FROM visits #+if publication.Incremental() then WHERE visit_date >= (SELECT MAX(date_day) FROM {{ publication.Target() }}) #+end GROUP BY date_day ; #+end

 

Unique key

If block has a unique key defined as part of its schema, then publication process will be using this information to match rows from the target with the incrementally published rows and updating existing rows that matches. For example, if you have a block which calculates daily active users and you want to publish it incrementally, make sure that you have date_day column specified as your unique key. Otherwise, if you run this publication more than once per day you will be getting duplicate rows for the given date.

#+src sql DAU()
#+meta {
  :publication {
    :type "table",
    :name "daily_active_users",
    :incremental true
  },
  :schema {
    :unique_key ["date_day"]
  }
}
#+begin
  SELECT
    DATE_TRUNC('day', visit_date) as date_day,
    COUNT(DISTINCT user_id) AS users_count
  FROM
    visits
  #+if __INCREMENTAL__ then
    WHERE visit_date >= (SELECT MAX(date_day) FROM {{ __TARGET__ }})
  #+end
  GROUP BY
    date_day
  ;
#+end

Update logic for rows based on unique key works differently on different database platforms. Upsert statements are used when supported, otherwise the combination of DELETE and INSERT statements are used which is executed inside a transaction to garantee atomicy of the operation.

Incremental publication is not supported for file targets, since it requires either a) querying target data, to get a proper value for filter condition or b) upsert operation to update rows based on unique key, which is really problematic for data presented as a raw file on a file system.

 

Use tags to simplify pipeline execution

Tags allow you to easily refer to a collection of blocks. This can be useful in creating various groups of block which needs to be executed together as a scheduling or triggering runs.

Example:

#+src sql salesFact()
#+meta {
  :publication {
    :type "table",
    :name "mart_sales_fact"
  },
  :tags ["publication"]
}
#+begin
 ...
#+end

Once you have assigned tags to blocks, you can use the tags to run blocks with a tags.

Was this article helpful?
0 out of 0 found this helpful
Have more questions? Submit a request