Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions openmeter/customer/adapter/customer.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,14 +382,22 @@ func (a *adapter) GetCustomerByUsageAttribution(ctx context.Context, input custo

query := repo.db.Customer.Query().
Where(customerdb.Namespace(input.Namespace)).
Where(customerdb.HasSubjectsWith(
customersubjectsdb.SubjectKey(input.SubjectKey),
customersubjectsdb.Or(
customersubjectsdb.DeletedAtIsNil(),
customersubjectsdb.DeletedAtGT(now),
Where(
customerdb.Or(
// We lookup the customer by subject key in the subjects table
customerdb.HasSubjectsWith(
customersubjectsdb.SubjectKey(input.SubjectKey),
customersubjectsdb.Or(
customersubjectsdb.DeletedAtIsNil(),
customersubjectsdb.DeletedAtGT(now),
),
),
// Or else we lookup the customer by key in the customers table
customerdb.Key(input.SubjectKey),
),
)).
).
Where(customerdb.DeletedAtIsNil())

query = withSubjects(query, now)
query = withSubscription(query, now)

Expand Down
32 changes: 28 additions & 4 deletions openmeter/streaming/clickhouse/meter_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestQueryMeter(t *testing.T) {
windowSize := meter.WindowSizeHour

tests := []struct {
testName string
query queryMeter
wantSQL string
wantArgs []interface{}
Expand Down Expand Up @@ -50,6 +51,7 @@ func TestQueryMeter(t *testing.T) {
wantArgs: []interface{}{"my_namespace", "event1", []string{"subject1"}, from.Unix(), to.Unix()},
},
{ // Aggregate all available data
testName: "aggregate all available data",
query: queryMeter{
Database: "openmeter",
EventsTableName: "om_events",
Expand All @@ -69,6 +71,7 @@ func TestQueryMeter(t *testing.T) {
wantArgs: []interface{}{"my_namespace", "event1"},
},
{ // Aggregate with count aggregation
testName: "aggregate with count aggregation",
query: queryMeter{
Database: "openmeter",
EventsTableName: "om_events",
Expand All @@ -87,6 +90,7 @@ func TestQueryMeter(t *testing.T) {
wantArgs: []interface{}{"my_namespace", "event1"},
},
{ // Aggregate with LATEST aggregation
testName: "aggregate with LATEST aggregation",
query: queryMeter{
Database: "openmeter",
EventsTableName: "om_events",
Expand All @@ -106,6 +110,7 @@ func TestQueryMeter(t *testing.T) {
wantArgs: []interface{}{"my_namespace", "event1"},
},
{ // Aggregate data from start
testName: "aggregate data from start",
query: queryMeter{
Database: "openmeter",
EventsTableName: "om_events",
Expand All @@ -126,6 +131,7 @@ func TestQueryMeter(t *testing.T) {
wantArgs: []interface{}{"my_namespace", "event1", from.Unix()},
},
{ // Aggregate data between period
testName: "aggregate data between period",
query: queryMeter{
Database: "openmeter",
EventsTableName: "om_events",
Expand All @@ -147,6 +153,7 @@ func TestQueryMeter(t *testing.T) {
wantArgs: []interface{}{"my_namespace", "event1", from.Unix(), to.Unix()},
},
{ // Aggregate data between period, groupped by window size
testName: "aggregate data between period, groupped by window size",
query: queryMeter{
Database: "openmeter",
EventsTableName: "om_events",
Expand All @@ -169,6 +176,7 @@ func TestQueryMeter(t *testing.T) {
wantArgs: []interface{}{"my_namespace", "event1", from.Unix(), to.Unix()},
},
{ // Aggregate data between period in a different timezone, groupped by window size
testName: "aggregate data between period in a different timezone, groupped by window size",
query: queryMeter{
Database: "openmeter",
EventsTableName: "om_events",
Expand All @@ -192,6 +200,7 @@ func TestQueryMeter(t *testing.T) {
wantArgs: []interface{}{"my_namespace", "event1", from.Unix(), to.Unix()},
},
{ // Aggregate data for a single subject
testName: "aggregate data for a single subject",
query: queryMeter{
Database: "openmeter",
EventsTableName: "om_events",
Expand All @@ -213,6 +222,7 @@ func TestQueryMeter(t *testing.T) {
wantArgs: []interface{}{"my_namespace", "event1", []string{"subject1"}},
},
{ // Aggregate data for a single subject and group by additional fields
testName: "aggregate data for a single subject and group by additional fields",
query: queryMeter{
Database: "openmeter",
EventsTableName: "om_events",
Expand All @@ -234,6 +244,7 @@ func TestQueryMeter(t *testing.T) {
wantArgs: []interface{}{"my_namespace", "event1", []string{"subject1"}},
},
{ // Aggregate data for a multiple subjects
testName: "aggregate data for a multiple subjects",
query: queryMeter{
Database: "openmeter",
EventsTableName: "om_events",
Expand All @@ -255,6 +266,7 @@ func TestQueryMeter(t *testing.T) {
wantArgs: []interface{}{"my_namespace", "event1", []string{"subject1", "subject2"}},
},
{ // Select customer ID
testName: "select customer ID",
query: queryMeter{
Database: "openmeter",
EventsTableName: "om_events",
Expand Down Expand Up @@ -294,7 +306,8 @@ func TestQueryMeter(t *testing.T) {
wantSQL: "WITH map('subject1', 'customer1', 'subject2', 'customer2') as subject_to_customer_id SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, subject_to_customer_id[om_events.subject] AS customer_id FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.subject IN (?) GROUP BY customer_id",
wantArgs: []interface{}{"my_namespace", "event1", []string{"subject1", "subject2"}},
},
{ // Filter by customer ID without group by
{ // Filter by customer without group by
testName: "filter by customer ID without group by",
query: queryMeter{
Database: "openmeter",
EventsTableName: "om_events",
Expand All @@ -313,6 +326,7 @@ func TestQueryMeter(t *testing.T) {
},
ID: "customer1",
},
Key: lo.ToPtr("customer-key-1"),
UsageAttribution: customer.CustomerUsageAttribution{
SubjectKeys: []string{"subject1"},
},
Expand All @@ -330,10 +344,18 @@ func TestQueryMeter(t *testing.T) {
},
},
},
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.subject IN (?)",
wantArgs: []interface{}{"my_namespace", "event1", []string{"subject1", "subject2"}},
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.subject IN (?)",
wantArgs: []interface{}{"my_namespace", "event1", []string{
// Only the first customer has a key
"customer-key-1",
// Usage attribution subjects of the first customer
"subject1",
// Usage attribution subjects of the second customer
"subject2",
}},
},
{ // Filter by both customer and subject
testName: "filter by both customer and subject",
query: queryMeter{
Database: "openmeter",
EventsTableName: "om_events",
Expand Down Expand Up @@ -384,6 +406,7 @@ func TestQueryMeter(t *testing.T) {
wantArgs: []interface{}{"my_namespace", "event1"},
},
{ // Aggregate data with filtering for a single group and multiple values
testName: "aggregate data with filtering for a single group and multiple values",
query: queryMeter{
Database: "openmeter",
EventsTableName: "om_events",
Expand All @@ -404,6 +427,7 @@ func TestQueryMeter(t *testing.T) {
wantArgs: []interface{}{"my_namespace", "event1"},
},
{ // Aggregate data with filtering for multiple groups and multiple values
testName: "aggregate data with filtering for multiple groups and multiple values",
query: queryMeter{
Database: "openmeter",
EventsTableName: "om_events",
Expand All @@ -426,7 +450,7 @@ func TestQueryMeter(t *testing.T) {
}

for _, tt := range tests {
t.Run("", func(t *testing.T) {
t.Run(tt.testName, func(t *testing.T) {
gotSql, gotArgs, err := tt.query.toSQL()
if err != nil {
t.Error(err)
Expand Down
54 changes: 42 additions & 12 deletions openmeter/streaming/clickhouse/queryhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"strings"

"github.com/huandu/go-sqlbuilder"
"github.com/samber/lo"

"github.com/openmeterio/openmeter/openmeter/streaming"
)

const subjectToCustomerIDDictionary = "subject_to_customer_id"

// selectCustomerIdColumn
func selectCustomerIdColumn(eventsTableName string, customers []streaming.Customer, query *sqlbuilder.SelectBuilder) *sqlbuilder.SelectBuilder {
// If there are no customers, we return an empty customer id column
Expand All @@ -21,28 +22,43 @@ func selectCustomerIdColumn(eventsTableName string, customers []streaming.Custom
getColumn := columnFactory(eventsTableName)
subjectColumn := getColumn("subject")

// Build a map of subject to customer id
// Build a map of event subjects to customer ids
var values []string

// For each customer, we map event subjects to customer ids
for _, customer := range customers {
// Add each subject key to the map and map it to the customer id
customerIDSQL := fmt.Sprintf("'%s'", sqlbuilder.Escape(customer.GetUsageAttribution().ID))

// We map the customer key to the customer id if it exists
if customer.GetUsageAttribution().Key != nil {
customerKeySQL := fmt.Sprintf("'%s'", sqlbuilder.Escape(*customer.GetUsageAttribution().Key))
values = append(values, customerKeySQL, customerIDSQL)
}

// We map each subject key to the customer id
for _, subjectKey := range customer.GetUsageAttribution().SubjectKeys {
subjectSQL := fmt.Sprintf("'%s'", sqlbuilder.Escape(subjectKey))
customerIDSQL := fmt.Sprintf("'%s'", sqlbuilder.Escape(customer.GetUsageAttribution().ID))

values = append(values, subjectSQL, customerIDSQL)
}
}

mapAs := "subject_to_customer_id"
mapSQL := fmt.Sprintf("WITH map(%s) as %s", strings.Join(values, ", "), mapAs)
// If there are no values, we return an empty customer id column
// This can happen if none of the customers has key or usage attribution subjects
if len(values) == 0 {
return query.SelectMore("'' AS customer_id")
}

// Name of the map (dictionary)

mapSQL := fmt.Sprintf("WITH map(%s) as %s", strings.Join(values, ", "), subjectToCustomerIDDictionary)

// Add the map to query via WITH clause
mapQuery := sqlbuilder.ClickHouse.NewCTEBuilder().SQL(mapSQL)
query = query.With(mapQuery)

// Select the customer id column
query = query.SelectMore(fmt.Sprintf("%s[%s] AS customer_id", mapAs, subjectColumn))
query = query.SelectMore(fmt.Sprintf("%s[%s] AS customer_id", subjectToCustomerIDDictionary, subjectColumn))

return query
}
Expand All @@ -58,12 +74,26 @@ func customersWhere(eventsTableName string, customers []streaming.Customer, quer
getColumn := columnFactory(eventsTableName)
subjectColumn := getColumn("subject")

// If the customer filter is provided, we add all the subjects to the filter
subjects := lo.Map(customers, func(customer streaming.Customer, _ int) []string {
return customer.GetUsageAttribution().SubjectKeys
})
var subjects []string

// Collect all the subjects from the customers
for _, customer := range customers {
// Add the customer key to the filter if it exists
if customer.GetUsageAttribution().Key != nil {
subjects = append(subjects, *customer.GetUsageAttribution().Key)
}

// Add each subject key to the filter
subjects = append(subjects, customer.GetUsageAttribution().SubjectKeys...)
}

// If there are no subjects, we return an empty subject filter
// This can happen if none of the customers has key or usage attribution subjects
if len(subjects) == 0 {
return query
}

return query.Where(query.In(subjectColumn, lo.Flatten(subjects)))
return query.Where(query.In(subjectColumn, subjects))
}

// subjectWhere applies the subject filter to the query.
Expand Down
25 changes: 23 additions & 2 deletions test/customer/customer.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ func (s *CustomerHandlerTestSuite) TestGetByUsageAttribution(ctx context.Context
Namespace: s.namespace,
CustomerMutate: customer.CustomerMutate{
Name: TestName,
Key: lo.ToPtr(TestKey),
UsageAttribution: customer.CustomerUsageAttribution{
SubjectKeys: TestSubjectKeys,
},
Expand All @@ -645,8 +646,28 @@ func (s *CustomerHandlerTestSuite) TestGetByUsageAttribution(ctx context.Context
require.NotNil(t, cus, "Customer must not be nil")
require.Equal(t, s.namespace, cus.Namespace, "Customer namespace must match")
require.Equal(t, createdCustomer.ID, cus.ID, "Customer ID must match")
require.Equal(t, TestName, cus.Name, "Customer name must match")
require.Equal(t, TestSubjectKeys, cus.UsageAttribution.SubjectKeys, "Customer usage attribution subject keys must match")

// Get the customer by key
cus, err = service.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{
Namespace: s.namespace,
SubjectKey: TestKey,
})

require.NoError(t, err, "Fetching customer must not return error")
require.NotNil(t, cus, "Customer must not be nil")
require.Equal(t, s.namespace, cus.Namespace, "Customer namespace must match")
require.Equal(t, createdCustomer.ID, cus.ID, "Customer ID must match")

// Get the customer by key
cus, err = service.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{
Namespace: s.namespace,
SubjectKey: TestKey,
})

require.NoError(t, err, "Fetching customer must not return error")
require.NotNil(t, cus, "Customer must not be nil")
require.Equal(t, s.namespace, cus.Namespace, "Customer namespace must match")
require.Equal(t, createdCustomer.ID, cus.ID, "Customer ID must match")

// Get the customer by usage attribution with a non-existent subject key
_, err = service.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{
Expand Down
Loading