Skip to content
Andrey edited this page Dec 8, 2023 · 2 revisions

my-postgres-macros

Macros uses the tokio_postgres cargo package to generate requests; For Date time - DateTimeAsMicroseconds is used

All the attrubutes models are here: https://github.com/MyJetTools/my-postgres/tree/main/my-postgres-macros/src/attributes

All the use cases are going to be presented by simple Client KeyValue use cases.

Let's say we want to save a string key/value for each client

So we have a structure

pub struct KeyValueDto {
    pub client_id: String,
    pub key: String,
    pub value: String,
}

Insert use-cases

#[derive(InsertDbEntity)]
pub struct KeyValueDto {
    pub client_id: String,
    pub key: String,
    pub value: String,
}

this macro generates the method which we can use

let postgres_client: MyPostgres = ...

let insert_entity = KeyValueDto {
            client_id,
            key,
            value,
        };
            
postgres_client.insert_db_entity(&insert_entity, TABLE_NAME)
            .await?;
//With telemetry
           
let my_telemetry: Option<&MyTelemetry> = ..;
postgres_client.insert_db_entity(&insert_entity, TABLE_NAME, my_telemetry)
            .await?;

            

under the hood, it generates and executes SQL code such as:

INSERT INTO {TABLE_NAME} (client_id, key, value) VALUES ($1, $2, $3)

Update use-case

To mark primary keys attr #[primary_key] is used

#[derive(UpdateDbEntity)]
pub struct KeyValueDto {
    #[primary_key]
    pub client_id: String,
    #[primary_key]
    pub key: String,
    pub value: String,
}

which generates the code possible to use:

let postgres_client: MyPostgres = ...

let update_entity = KeyValueDto {
            client_id,
            key,
            value,
        };

postgres_client
            .update_db_entity(&update_entity, TABLE_NAME)
            .await?;
//With telemetry
           
let my_telemetry: Option<&MyTelemetry> = ..;
postgres_client.update_db_entity(&update_entity, TABLE_NAME, my_telemetry)
            .await?;
            

under the hood, it generates and executes code such as

INSERT INTO {TABLE_NAME} SET value = $3 WHERE client_id = $1 AND key = $2,

Insert or Update use-case

To mark primary keys attr #[primary_key] is used

as well as the name of the primary key is required

#[derive(InsertDbEntity, UpdateDbEntity)]
pub struct KeyValueDto {
    #[primary_key]
    pub client_id: String,
    #[primary_key]
    pub key: String,
    pub value: String,
}


let postgres_client: MyPostgres = ...

let insert_or_update_entity = KeyValueDto {
            client_id,
            key,
            value,
        };

postgres_client
            .insert_or_update_db_entity(&insert_or_update_entity, TABLE_NAME, PK_NAME)
            .await?;
// With telemetry
           
let my_telemetry: Option<&MyTelemetry> = ..;
postgres_client.insert_or_update_db_entity(&insert_or_update_entity, TABLE_NAME, PK_NAME, my_telemetry)
            .await?;
            

under the hood, it generates and executes code such as

INSERT INTO {TABLE_NAME} SET value = $3 WHERE client_id = $1 AND key = $2
ON CONFLICT ON CONSTRAINT {PK_NAME} DO UPDATE SET ($3)

Insert if not exists use-case

#[derive(InsertDbEntity)]
pub struct KeyValueDto {
    pub client_id: String,
    pub key: String,
    pub value: String,
}


let postgres_client: MyPostgres = ...

let insert_entity = KeyValueDto {
            client_id,
            key,
            value,
        };

postgres_client
            .insert_db_entity_if_not_exists(&insert_entity, TABLE_NAME, PK_NAME)
            .await?;
// With telemetry
           
let my_telemetry: Option<&MyTelemetry> = ..;
postgres_client.insert_db_entity_if_not_exists(&insert_entity, TABLE_NAME, PK_NAME, my_telemetry)
            .await?;
            

under the hood, it generates and executes code such as

INSERT INTO {TABLE_NAME} SET value = $3 WHERE client_id = $1 AND key = $2
ON CONFLICT DO NOTHING

Delete use-case

#[derive(WhereDbModel)]
pub struct WhereModel {
    pub client_id: String,
    pub key: String,
}

let postgres_client: MyPostgres = ...

let where_dto = WhereModel {
            client_id: ...,
            key: ..,
        };

postgres_client
            .delete_db_entity(&where_dto, TABLE_NAME)
            .await?;
// With telemetry
           
let my_telemetry: Option<&MyTelemetry> = ..;
postgres_client.delete_db_entity(&where_dto, TABLE_NAME, my_telemetry)
            .await?;
            

under the hood, it generates and executes code such as

DELETE TABLE {TABLE_NAME} WHERE client_id = $1 AND key = $2

Select use-case

Requires two types of model:

  • Select Model
  • Where model

to get a vector of rows

#[derive(WhereDbModel)]
pub struct GetInputParam {
    pub client_id: String,
    pub key: String,
}

#[derive(SelectDbEntity)]
pub struct KeyValueDto {
    pub client_id: String,
    pub key: String,
    pub value: String,
}


let postgres_client: MyPostgres = ...

let where_dto = GetInputParam{
  client_id: //Whatever
  key: //Whatever
}

let result: Vec<KeyValueDto>  = postgres_client
            .query_rows(TABLE_NAME, &where_dto)
            .await?;
            
// With Telemetry
let my_telemetry: Option<&MyTelemetry> = ..;
let result: Vec<KeyValueDto>  = postgres_client
            .query_rows(TABLE_NAME, &where_dto, telemetry_context)
            .await?;

under the hood, it generates and executes code such as

SELECT client_id, key, value FROM {TABLE_NAME} WHERE client_id = $1 AND key = $2

to get a single row

#[derive(WhereDbModel)]
pub struct GetInputParam {
    pub client_id: String,
    pub key: String,
}

#[derive(SelectDbEntity)]
pub struct KeyValueDto {
    pub client_id: String,
    pub key: String,
    pub value: String,
}


let postgres_client: MyPostgres = ...

let where_dto = GetInputParam{
  client_id: //Whatever
  key: //Whatever
}

let result: Option<KeyValueDto>  = postgres_client
            .query_single_row(TABLE_NAME, &where_dto)
            .await?;
                        
// With Telemetry
let my_telemetry: Option<&MyTelemetry> = ..;
let result: Option<KeyValueDto>  = postgres_client
            .query_rows(TABLE_NAME, &where_dto, telemetry_context)
            .await?;

Bulk select rows

This use-case is useful if we want to do a similar SQL request and we do not want to pay round-trip TCP expenses.

For instance - we have an authentication case and we want to aggregate requests and execute them in bulk.

line_no - is mandatory field which maps bulk requests with bulk responses

#[derive(BulkSelectDbEntity, SelectDbEntity)]
pub struct BulkSelectKeyValueDto {
    #[line_no]
    pub line_no: i32,
    pub client_id: String,
    pub key: String,
    pub value: String,
}



let select_builder = BulkSelectBuilder::new(TABLE_NAME, keys);

let db_rows: Vec<KeyValueDto> = postgress_client.bulk_query_rows(&select_builder).await?;

or - if let's say we do not want optional values - we can transform them with the request with transformation

#[derive(SelectDbEntity, BulkSelectDbEntity)]
pub struct BulkSelectKeyValueDto {
    #[line_no]
    pub line_no: i32,
    pub client_id: String,
    pub key: String,
    pub value: String,
}

#[derive(SelectDbEntity, InsertOrUpdateDbEntity)]
pub struct KeyValueDto {
    #[primary_key]
    pub client_id: String,
    #[primary_key]
    pub key: String,
    pub value: String,
}

let select_builder = BulkSelectBuilder::new(TABLE_NAME, keys);

let db_rows: Vec<KeyValueDto> = postgress_client
       .bulk_query_rows_with_transformation(
                &select_builder,
                |input, db_row: Option<BulkSelectKeyValueDto>| {
                    if let Some(db_row) = db_row {
                        KeyValueDto {
                            client_id: db_row.client_id,
                            key: db_row.key,
                            value: db_row.value.to_string(),
                        }
                    } else {
                        KeyValueDto {
                            client_id: input.client_id.to_string(),
                            key: input.key.to_string(),
                            value: "".to_string(),
                        }
                    }
                },
               
            )
            .await?;

Group By selecting rows

To Make sure we group_by line is generated, $[sql] and #[group_by] attributes can be used

#[derive(SelectDbEntity)]
pub struct AmountOfAssetsDto {
    #[sql("count(*)::int")]
    pub count: i32,
    #[group_by]
    pub asset_id: String,
}

#[derive(WhereDbModel)]
pub struct WhereDto<'s> {
    pub client_id: &'s str,
}

let result: Vec<AmountOfAssetsDto> = self.postgress
            .query_rows(TABLE_NAME, &where_dto)
            .await
            .unwrap()

would be turned into SQL Statement

SELECT count(*)::int as count,asset_id FROM operations_history WHERE client_id=$1 GROUP BY asset_id

as well - structs of my_postgres::group_by_fields can be used.

Example

use my_postgres::GroupByCount;

#[derive(SelectDbEntity)]
pub struct StatisticsModel {
    #[group_by]
    pub level: LogLevelDto,
    pub count: GroupByCount,
}

Concurrent insert or update

Field e_tag of type i64 is needed;

This field is used to

#[derive(SelectDbEntity, InsertDbEntity, UpdateDbEntity)]
pub struct TestETagDto {
    #[primary_key]
    pub id: i32,

    #[sql_type("timestamp")]
    pub date: DateTimeAsMicroseconds,

    #[db_field_name("etag")]
    #[e_tag]
    pub e_tag: i64,
}

#[derive(WhereDbModel)]
pub struct ETagWHere {
    pub id: i32,
}


let entity = TestETagDto{...}

my_postgres.concurrent_insert_or_update_single_entity(
        "test-table",
        &ETagWHere { id: 1 },
        || {
            Some(TestETagDto {
                id: 1,
                date: DateTimeAsMicroseconds::now(),
                e_tag: 0, // Put random value here. It will be updated when inserted
            })
        },
        |itm| {
            itm.date = DateTimeAsMicroseconds::now();
            true
        },
    );

Clone this wiki locally