diff --git a/cmd/hnanalytics/main.go b/cmd/hnanalytics/main.go index 7264819..db8a949 100644 --- a/cmd/hnanalytics/main.go +++ b/cmd/hnanalytics/main.go @@ -2,21 +2,54 @@ package main import ( "context" + "flag" "fmt" "log" + "time" + "hnanalytics/internal/ingest/bigquery" "hnanalytics/internal/whoishiring" ) +const dateLayout = "2006-01-02" + func main() { ctx := context.Background() - svc, err := whoishiring.NewService(ctx, "whoishiring.db") + source := flag.String("source", "api", "ingestion source: api or bigquery") + dbName := flag.String("db", "whoishiring.db", "datastore base name") + startDate := flag.String("start-date", "", "start date (YYYY-MM-DD), inclusive") + endDate := flag.String("end-date", "", "end date (YYYY-MM-DD), inclusive") + incremental := flag.Bool("incremental", false, "run incremental sync mode") + projectID := flag.String("gcp-project", "", "GCP project for BigQuery jobs") + location := flag.String("gcp-location", "", "BigQuery job location, e.g. US") + maxBytesBilled := flag.Int64("max-bytes-billed", 0, "optional BigQuery max bytes billed") + flag.Parse() + + svc, err := whoishiring.NewService(ctx, *dbName) if err != nil { log.Fatal(err) } - defer svc.Close() + defer func() { + if err := svc.Close(); err != nil { + log.Fatal(err) + } + }() + switch *source { + case "api": + runAPISource(ctx, svc) + case "bigquery": + cfg := bigquery.Config{ProjectID: *projectID, Location: *location, MaxBytesBilled: *maxBytesBilled} + if err := runBigQuerySource(ctx, svc, cfg, *startDate, *endDate, *incremental); err != nil { + log.Fatal(err) + } + default: + log.Fatalf("unsupported source %q", *source) + } +} + +func runAPISource(ctx context.Context, svc *whoishiring.Service) { users := []string{"whoishiring", "_whoishiring"} for _, user := range users { scrapedUser, err := svc.ScrapeUser(ctx, user) @@ -49,3 +82,84 @@ func main() { } fmt.Printf("Processed %d analytic items\n", len(analyticItems)) } + +func runBigQuerySource(ctx context.Context, svc *whoishiring.Service, cfg bigquery.Config, startDate, endDate string, incremental bool) error { + source, err := bigquery.New(ctx, cfg) + if err != nil { + return err + } + defer source.Close() + + if incremental { + return runBigQueryIncremental(ctx, svc, source, endDate) + } + if startDate == "" || endDate == "" { + return fmt.Errorf("start-date and end-date are required for backfill") + } + start, end, err := parseDateRange(startDate, endDate) + if err != nil { + return err + } + + items, watermark, err := source.BackfillByDateRange(ctx, start, end) + if err != nil { + return err + } + inserted := svc.UpsertAnalyticItems(ctx, items) + if len(items) > 0 { + svc.SetCheckpoint(ctx, "bigquery", whoishiring.Checkpoint{Time: watermark.Time, ID: watermark.ID}) + } + fmt.Printf("Backfilled %d items (%d inserted new rows).\n", len(items), inserted) + return nil +} + +func runBigQueryIncremental(ctx context.Context, svc *whoishiring.Service, source *bigquery.Source, endDate string) error { + watermark := bigquery.Watermark{} + if cp, ok := svc.GetCheckpoint(ctx, "bigquery"); ok { + watermark = bigquery.Watermark{Time: cp.Time, ID: cp.ID} + } + + upperBound := time.Now().UTC().Truncate(24 * time.Hour).Add(24 * time.Hour) + if endDate != "" { + end, err := parseDate(endDate) + if err != nil { + return err + } + upperBound = end.Add(24 * time.Hour) + } + + items, latest, err := source.SyncIncremental(ctx, watermark, upperBound) + if err != nil { + return err + } + inserted := svc.UpsertAnalyticItems(ctx, items) + if len(items) > 0 { + svc.SetCheckpoint(ctx, "bigquery", whoishiring.Checkpoint{Time: latest.Time, ID: latest.ID}) + } + fmt.Printf("Incremental sync returned %d items (%d inserted new rows).\n", len(items), inserted) + return nil +} + +func parseDateRange(startDate, endDate string) (time.Time, time.Time, error) { + start, err := parseDate(startDate) + if err != nil { + return time.Time{}, time.Time{}, err + } + end, err := parseDate(endDate) + if err != nil { + return time.Time{}, time.Time{}, err + } + endExclusive := end.Add(24 * time.Hour) + if !start.Before(endExclusive) { + return time.Time{}, time.Time{}, fmt.Errorf("start-date must be before or equal to end-date") + } + return start, endExclusive, nil +} + +func parseDate(value string) (time.Time, error) { + parsed, err := time.Parse(dateLayout, value) + if err != nil { + return time.Time{}, fmt.Errorf("invalid date %q: %w", value, err) + } + return parsed.UTC(), nil +} diff --git a/go.mod b/go.mod index 927f391..3e437a5 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,54 @@ module hnanalytics -go 1.22 +go 1.24.0 + +require ( + cloud.google.com/go/bigquery v1.73.1 + google.golang.org/api v0.267.0 +) + +require ( + cloud.google.com/go v0.123.0 // indirect + cloud.google.com/go/auth v0.18.1 // indirect + cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect + cloud.google.com/go/compute/metadata v0.9.0 // indirect + cloud.google.com/go/iam v1.5.3 // indirect + github.com/apache/arrow/go/v15 v15.0.2 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/google/flatbuffers v23.5.26+incompatible // indirect + github.com/google/s2a-go v0.1.9 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.11 // indirect + github.com/googleapis/gax-go/v2 v2.17.0 // indirect + github.com/klauspost/compress v1.16.7 // indirect + github.com/klauspost/cpuid/v2 v2.2.5 // indirect + github.com/pierrec/lz4/v4 v4.1.18 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect + go.opentelemetry.io/otel v1.39.0 // indirect + go.opentelemetry.io/otel/metric v1.39.0 // indirect + go.opentelemetry.io/otel/trace v1.39.0 // indirect + golang.org/x/crypto v0.47.0 // indirect + golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect + golang.org/x/mod v0.31.0 // indirect + golang.org/x/net v0.49.0 // indirect + golang.org/x/oauth2 v0.35.0 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.40.0 // indirect + golang.org/x/telemetry v0.0.0-20251203150158-8fff8a5912fc // indirect + golang.org/x/text v0.33.0 // indirect + golang.org/x/time v0.14.0 // indirect + golang.org/x/tools v0.40.0 // indirect + golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect + google.golang.org/genproto v0.0.0-20260128011058-8636f8732409 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260203192932-546029d2fa20 // indirect + google.golang.org/grpc v1.78.0 // indirect + google.golang.org/protobuf v1.36.11 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..6e9e36b --- /dev/null +++ b/go.sum @@ -0,0 +1,145 @@ +cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY= +cel.dev/expr v0.24.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw= +cloud.google.com/go v0.123.0 h1:2NAUJwPR47q+E35uaJeYoNhuNEM9kM8SjgRgdeOJUSE= +cloud.google.com/go v0.123.0/go.mod h1:xBoMV08QcqUGuPW65Qfm1o9Y4zKZBpGS+7bImXLTAZU= +cloud.google.com/go/auth v0.18.1 h1:IwTEx92GFUo2pJ6Qea0EU3zYvKnTAeRCODxfA/G5UWs= +cloud.google.com/go/auth v0.18.1/go.mod h1:GfTYoS9G3CWpRA3Va9doKN9mjPGRS+v41jmZAhBzbrA= +cloud.google.com/go/auth/oauth2adapt v0.2.8 h1:keo8NaayQZ6wimpNSmW5OPc283g65QNIiLpZnkHRbnc= +cloud.google.com/go/auth/oauth2adapt v0.2.8/go.mod h1:XQ9y31RkqZCcwJWNSx2Xvric3RrU88hAYYbjDWYDL+c= +cloud.google.com/go/bigquery v1.73.1 h1:v//GZwdhtmCbZ87rOnxz7pectOGFS1GNRvrGTvLzka4= +cloud.google.com/go/bigquery v1.73.1/go.mod h1:KSLx1mKP/yGiA8U+ohSrqZM1WknUnjZAxHAQZ51/b1k= +cloud.google.com/go/compute/metadata v0.9.0 h1:pDUj4QMoPejqq20dK0Pg2N4yG9zIkYGdBtwLoEkH9Zs= +cloud.google.com/go/compute/metadata v0.9.0/go.mod h1:E0bWwX5wTnLPedCKqk3pJmVgCBSM6qQI1yTBdEb3C10= +cloud.google.com/go/datacatalog v1.26.1 h1:bCRKA8uSQN8wGW3Tw0gwko4E9a64GRmbW1nCblhgC2k= +cloud.google.com/go/datacatalog v1.26.1/go.mod h1:2Qcq8vsHNxMDgjgadRFmFG47Y+uuIVsyEGUrlrKEdrg= +cloud.google.com/go/iam v1.5.3 h1:+vMINPiDF2ognBJ97ABAYYwRgsaqxPbQDlMnbHMjolc= +cloud.google.com/go/iam v1.5.3/go.mod h1:MR3v9oLkZCTlaqljW6Eb2d3HGDGK5/bDv93jhfISFvU= +cloud.google.com/go/longrunning v0.8.0 h1:LiKK77J3bx5gDLi4SMViHixjD2ohlkwBi+mKA7EhfW8= +cloud.google.com/go/longrunning v0.8.0/go.mod h1:UmErU2Onzi+fKDg2gR7dusz11Pe26aknR4kHmJJqIfk= +cloud.google.com/go/monitoring v1.24.3 h1:dde+gMNc0UhPZD1Azu6at2e79bfdztVDS5lvhOdsgaE= +cloud.google.com/go/monitoring v1.24.3/go.mod h1:nYP6W0tm3N9H/bOw8am7t62YTzZY+zUeQ+Bi6+2eonI= +cloud.google.com/go/storage v1.59.0 h1:9p3yDzEN9Vet4JnbN90FECIw6n4FCXcKBK1scxtQnw8= +cloud.google.com/go/storage v1.59.0/go.mod h1:cMWbtM+anpC74gn6qjLh+exqYcfmB9Hqe5z6adx+CLI= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0 h1:sBEjpZlNHzK1voKq9695PJSX2o5NEXl7/OL3coiIY0c= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0/go.mod h1:P4WPRUkOhJC13W//jWpyfJNDAIpvRbAUIYLX/4jtlE0= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.54.0 h1:lhhYARPUu3LmHysQ/igznQphfzynnqI3D75oUyw1HXk= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.54.0/go.mod h1:l9rva3ApbBpEJxSNYnwT9N4CDLrWgtq3u8736C5hyJw= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.54.0 h1:s0WlVbf9qpvkh1c/uDAPElam0WrL7fHRIidgZJ7UqZI= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.54.0/go.mod h1:Mf6O40IAyB9zR/1J8nGDDPirZQQPbYJni8Yisy7NTMc= +github.com/apache/arrow/go/v15 v15.0.2 h1:60IliRbiyTWCWjERBCkO1W4Qun9svcYoZrSLcyOsMLE= +github.com/apache/arrow/go/v15 v15.0.2/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cncf/xds/go v0.0.0-20251022180443-0feb69152e9f h1:Y8xYupdHxryycyPlc9Y+bSQAYZnetRJ70VMVKm5CKI0= +github.com/cncf/xds/go v0.0.0-20251022180443-0feb69152e9f/go.mod h1:HlzOvOjVBOfTGSRXRyY0OiCS/3J1akRGQQpRO/7zyF4= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.13.5-0.20251024222203-75eaa193e329 h1:K+fnvUM0VZ7ZFJf0n4L/BRlnsb9pL/GuDG6FqaH+PwM= +github.com/envoyproxy/go-control-plane/envoy v1.35.0 h1:ixjkELDE+ru6idPxcHLj8LBVc2bFP7iBytj353BoHUo= +github.com/envoyproxy/go-control-plane/envoy v1.35.0/go.mod h1:09qwbGVuSWWAyN5t/b3iyVfz5+z8QWGrzkoqm/8SbEs= +github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8= +github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-jose/go-jose/v4 v4.1.3 h1:CVLmWDhDVRa6Mi/IgCgaopNosCaHz7zrMeF9MlZRkrs= +github.com/go-jose/go-jose/v4 v4.1.3/go.mod h1:x4oUasVrzR7071A4TnHLGSPpNOm2a21K9Kf04k1rs08= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8ioaQmyPLg1b8VwK5WJg= +github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/martian/v3 v3.3.3 h1:DIhPTQrbPkgs2yJYdXU/eNACCG5DVQjySNRNlflZ9Fc= +github.com/google/martian/v3 v3.3.3/go.mod h1:iEPrYcgCF7jA9OtScMFQyAlZZ4YXTKEtJ1E6RWzmBA0= +github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0= +github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/googleapis/enterprise-certificate-proxy v0.3.11 h1:vAe81Msw+8tKUxi2Dqh/NZMz7475yUvmRIkXr4oN2ao= +github.com/googleapis/enterprise-certificate-proxy v0.3.11/go.mod h1:RFV7MUdlb7AgEq2v7FmMCfeSMCllAzWxFgRdusoGks8= +github.com/googleapis/gax-go/v2 v2.17.0 h1:RksgfBpxqff0EZkDWYuz9q/uWsTVz+kf43LsZ1J6SMc= +github.com/googleapis/gax-go/v2 v2.17.0/go.mod h1:mzaqghpQp4JDh3HvADwrat+6M3MOIDp5YKHhb9PAgDY= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= +github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= +github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo= +github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/spiffe/go-spiffe/v2 v2.6.0 h1:l+DolpxNWYgruGQVV0xsfeya3CsC7m8iBzDnMpsbLuo= +github.com/spiffe/go-spiffe/v2 v2.6.0/go.mod h1:gm2SeUoMZEtpnzPNs2Csc0D/gX33k1xIx7lEzqblHEs= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/detectors/gcp v1.38.0 h1:ZoYbqX7OaA/TAikspPl3ozPI6iY6LiIY9I8cUfm+pJs= +go.opentelemetry.io/contrib/detectors/gcp v1.38.0/go.mod h1:SU+iU7nu5ud4oCb3LQOhIZ3nRLj6FNVrKgtflbaf2ts= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 h1:YH4g8lQroajqUwWbq/tr2QX1JFmEXaDLgG+ew9bLMWo= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0/go.mod h1:fvPi2qXDqFs8M4B4fmJhE92TyQs9Ydjlg3RvfUp+NbQ= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 h1:F7Jx+6hwnZ41NSFTO5q4LYDtJRXBf2PD0rNBkeB/lus= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0/go.mod h1:UHB22Z8QsdRDrnAtX4PntOl36ajSxcdUMt1sF7Y6E7Q= +go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= +go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= +go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= +go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= +go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= +go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= +go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= +go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= +go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= +go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= +golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= +golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= +golang.org/x/mod v0.31.0 h1:HaW9xtz0+kOcWKwli0ZXy79Ix+UW/vOfmWI5QVd2tgI= +golang.org/x/mod v0.31.0/go.mod h1:43JraMp9cGx1Rx3AqioxrbrhNsLl2l/iNAvuBkrezpg= +golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= +golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= +golang.org/x/oauth2 v0.35.0 h1:Mv2mzuHuZuY2+bkyWXIHMfhNdJAdwW3FuWeCPYN5GVQ= +golang.org/x/oauth2 v0.35.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= +golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/telemetry v0.0.0-20251203150158-8fff8a5912fc h1:bH6xUXay0AIFMElXG2rQ4uiE+7ncwtiOdPfYK1NK2XA= +golang.org/x/telemetry v0.0.0-20251203150158-8fff8a5912fc/go.mod h1:hKdjCMrbv9skySur+Nek8Hd0uJ0GuxJIoIX2payrIdQ= +golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= +golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= +golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA= +golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= +golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= +golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/api v0.267.0 h1:w+vfWPMPYeRs8qH1aYYsFX68jMls5acWl/jocfLomwE= +google.golang.org/api v0.267.0/go.mod h1:Jzc0+ZfLnyvXma3UtaTl023TdhZu6OMBP9tJ+0EmFD0= +google.golang.org/genproto v0.0.0-20260128011058-8636f8732409 h1:VQZ/yAbAtjkHgH80teYd2em3xtIkkHd7ZhqfH2N9CsM= +google.golang.org/genproto v0.0.0-20260128011058-8636f8732409/go.mod h1:rxKD3IEILWEu3P44seeNOAwZN4SaoKaQ/2eTg4mM6EM= +google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 h1:merA0rdPeUV3YIIfHHcH4qBkiQAc1nfCKSI7lB4cV2M= +google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409/go.mod h1:fl8J1IvUjCilwZzQowmw2b7HQB2eAuYBabMXzWurF+I= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260203192932-546029d2fa20 h1:Jr5R2J6F6qWyzINc+4AM8t5pfUz6beZpHp678GNrMbE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260203192932-546029d2fa20/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= +google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= +google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/ingest/bigquery/source.go b/internal/ingest/bigquery/source.go new file mode 100644 index 0000000..07e3f29 --- /dev/null +++ b/internal/ingest/bigquery/source.go @@ -0,0 +1,150 @@ +package bigquery + +import ( + "context" + "fmt" + "math" + "time" + + gcbigquery "cloud.google.com/go/bigquery" + "google.golang.org/api/iterator" + + "hnanalytics/internal/whoishiring" +) + +const ( + defaultDataset = "bigquery-public-data.hacker_news" + defaultTable = "full" +) + +type Config struct { + ProjectID string + Location string + MaxBytesBilled int64 + Dataset string + Table string +} + +type Watermark struct { + Time time.Time + ID int +} + +type Source struct { + client *gcbigquery.Client + cfg Config +} + +type row struct { + ID int64 `bigquery:"id"` + Time time.Time `bigquery:"time"` + Title gcbigquery.NullString `bigquery:"title"` + Dead gcbigquery.NullBool `bigquery:"dead"` + Kids []int64 `bigquery:"kids"` +} + +func New(ctx context.Context, cfg Config) (*Source, error) { + if cfg.ProjectID == "" { + return nil, fmt.Errorf("project id is required") + } + if cfg.Dataset == "" { + cfg.Dataset = defaultDataset + } + if cfg.Table == "" { + cfg.Table = defaultTable + } + + client, err := gcbigquery.NewClient(ctx, cfg.ProjectID) + if err != nil { + return nil, err + } + + return &Source{client: client, cfg: cfg}, nil +} + +func (s *Source) Close() error { return s.client.Close() } + +func (s *Source) BackfillByDateRange(ctx context.Context, startInclusive, endExclusive time.Time) ([]whoishiring.AnalyticItem, Watermark, error) { + if !startInclusive.Before(endExclusive) { + return nil, Watermark{}, fmt.Errorf("start must be before end") + } + + query := fmt.Sprintf("\nSELECT\n id,\n time,\n title,\n dead,\n kids\nFROM `%s.%s`\nWHERE time >= @start_time\n AND time < @end_time\nORDER BY time ASC, id ASC", s.cfg.Dataset, s.cfg.Table) + + rows, err := s.readRows(ctx, query, []gcbigquery.QueryParameter{ + {Name: "start_time", Value: startInclusive}, + {Name: "end_time", Value: endExclusive}, + }) + if err != nil { + return nil, Watermark{}, err + } + return toAnalyticItems(rows) +} + +func (s *Source) SyncIncremental(ctx context.Context, watermark Watermark, upperBoundExclusive time.Time) ([]whoishiring.AnalyticItem, Watermark, error) { + query := fmt.Sprintf("\nSELECT\n id,\n time,\n title,\n dead,\n kids\nFROM `%s.%s`\nWHERE (time > @watermark_time OR (time = @watermark_time AND id > @watermark_id))\n AND time < @upper_bound\nORDER BY time ASC, id ASC", s.cfg.Dataset, s.cfg.Table) + + rows, err := s.readRows(ctx, query, []gcbigquery.QueryParameter{ + {Name: "watermark_time", Value: watermark.Time}, + {Name: "watermark_id", Value: watermark.ID}, + {Name: "upper_bound", Value: upperBoundExclusive}, + }) + if err != nil { + return nil, watermark, err + } + return toAnalyticItems(rows) +} + +func (s *Source) readRows(ctx context.Context, queryText string, params []gcbigquery.QueryParameter) ([]row, error) { + query := s.client.Query(queryText) + query.Parameters = params + if s.cfg.Location != "" { + query.Location = s.cfg.Location + } + if s.cfg.MaxBytesBilled > 0 { + query.MaxBytesBilled = s.cfg.MaxBytesBilled + } + + it, err := query.Read(ctx) + if err != nil { + return nil, err + } + + rows := make([]row, 0) + for { + var r row + err := it.Next(&r) + if err == iterator.Done { + break + } + if err != nil { + return nil, err + } + rows = append(rows, r) + } + return rows, nil +} + +func toAnalyticItems(rows []row) ([]whoishiring.AnalyticItem, Watermark, error) { + analytics := make([]whoishiring.AnalyticItem, 0, len(rows)) + wm := Watermark{} + for _, r := range rows { + if r.Dead.Valid && r.Dead.Bool { + continue + } + if r.ID > math.MaxInt { + return nil, Watermark{}, fmt.Errorf("row id %d overflows int", r.ID) + } + + var title *string + if r.Title.Valid { + t := r.Title.StringVal + title = &t + } + + analytic := whoishiring.AnalyticItemFromFields(int(r.ID), r.Time.UTC(), title, len(r.Kids)) + analytics = append(analytics, analytic) + wm = Watermark{Time: r.Time.UTC(), ID: int(r.ID)} + } + return analytics, wm, nil +} diff --git a/internal/ingest/bigquery/source_test.go b/internal/ingest/bigquery/source_test.go new file mode 100644 index 0000000..50af74f --- /dev/null +++ b/internal/ingest/bigquery/source_test.go @@ -0,0 +1,30 @@ +package bigquery + +import ( + "testing" + "time" + + gcbigquery "cloud.google.com/go/bigquery" +) + +func TestToAnalyticItems(t *testing.T) { + title := "Ask HN: Who is hiring? (June 2024)" + rows := []row{ + {ID: 10, Time: time.Unix(100, 0), Title: gcbigquery.NullString{StringVal: title, Valid: true}, Kids: []int64{1, 2}}, + {ID: 11, Time: time.Unix(101, 0), Dead: gcbigquery.NullBool{Bool: true, Valid: true}}, + } + + items, watermark, err := toAnalyticItems(rows) + if err != nil { + t.Fatalf("toAnalyticItems() error = %v", err) + } + if len(items) != 1 { + t.Fatalf("expected 1 item, got %d", len(items)) + } + if items[0].WHType != "hiring" { + t.Fatalf("expected hiring item type, got %q", items[0].WHType) + } + if watermark.ID != 10 { + t.Fatalf("expected watermark id 10, got %d", watermark.ID) + } +} diff --git a/internal/whoishiring/service.go b/internal/whoishiring/service.go index 8872740..18fd570 100644 --- a/internal/whoishiring/service.go +++ b/internal/whoishiring/service.go @@ -35,6 +35,13 @@ type dataStore struct { ScrapeUsers map[string][]userRecord `json:"scrape_users"` ScrapeItems map[int]itemRecord `json:"scrape_items"` AnalyticItems map[int]AnalyticItem `json:"analytic_items"` + Checkpoints map[string]Checkpoint `json:"checkpoints"` +} + +type Checkpoint struct { + Time time.Time `json:"time"` + ID int `json:"id"` + UpdatedAt time.Time `json:"updated_at"` } type userRecord struct { @@ -116,6 +123,7 @@ func NewService(_ context.Context, dbName string) (*Service, error) { ScrapeUsers: map[string][]userRecord{}, ScrapeItems: map[int]itemRecord{}, AnalyticItems: map[int]AnalyticItem{}, + Checkpoints: map[string]Checkpoint{}, }, } @@ -136,7 +144,24 @@ func (s *Service) load() error { if err != nil { return err } - return json.Unmarshal(content, &s.store) + if err := json.Unmarshal(content, &s.store); err != nil { + return err + } + + if s.store.ScrapeUsers == nil { + s.store.ScrapeUsers = map[string][]userRecord{} + } + if s.store.ScrapeItems == nil { + s.store.ScrapeItems = map[int]itemRecord{} + } + if s.store.AnalyticItems == nil { + s.store.AnalyticItems = map[int]AnalyticItem{} + } + if s.store.Checkpoints == nil { + s.store.Checkpoints = map[string]Checkpoint{} + } + + return nil } func (s *Service) persist() error { @@ -299,11 +324,14 @@ func (s *Service) ScrapeToAnalyticItems(_ context.Context) ([]AnalyticItem, erro } func toAnalyticItem(item Item) AnalyticItem { - kidsCount := len(item.Kids) + return AnalyticItemFromFields(item.ID, item.Time, item.Title, len(item.Kids)) +} + +func AnalyticItemFromFields(id int, createTime time.Time, title *string, kidsCount int) AnalyticItem { whType := "other" - if item.Title != nil { - title := strings.ToLower(*item.Title) + if title != nil { + title := strings.ToLower(*title) switch { case strings.HasPrefix(title, freelancerTitle): whType = "freelancer" @@ -314,7 +342,38 @@ func toAnalyticItem(item Item) AnalyticItem { } } - return AnalyticItem{ID: item.ID, CreateTime: item.Time, WHType: whType, NumKids: kidsCount} + return AnalyticItem{ID: id, CreateTime: createTime, WHType: whType, NumKids: kidsCount} +} + +func (s *Service) UpsertAnalyticItems(_ context.Context, items []AnalyticItem) int { + s.mu.Lock() + defer s.mu.Unlock() + + count := 0 + for _, item := range items { + if _, exists := s.store.AnalyticItems[item.ID]; !exists { + count++ + } + s.store.AnalyticItems[item.ID] = item + } + + return count +} + +func (s *Service) GetCheckpoint(_ context.Context, key string) (Checkpoint, bool) { + s.mu.Lock() + defer s.mu.Unlock() + + cp, ok := s.store.Checkpoints[key] + return cp, ok +} + +func (s *Service) SetCheckpoint(_ context.Context, key string, checkpoint Checkpoint) { + s.mu.Lock() + defer s.mu.Unlock() + + checkpoint.UpdatedAt = time.Now().UTC() + s.store.Checkpoints[key] = checkpoint } func (s *Service) fetch(ctx context.Context, url string) ([]byte, error) { diff --git a/internal/whoishiring/service_test.go b/internal/whoishiring/service_test.go index 52cc244..a5fc85b 100644 --- a/internal/whoishiring/service_test.go +++ b/internal/whoishiring/service_test.go @@ -1,6 +1,8 @@ package whoishiring import ( + "context" + "os" "testing" "time" ) @@ -37,3 +39,35 @@ func TestToAnalyticItem(t *testing.T) { t.Fatalf("expected 3 kids, got %d", analytic.NumKids) } } + +func TestUpsertAnalyticItemsAndCheckpoint(t *testing.T) { + dir := t.TempDir() + oldWD, err := os.Getwd() + if err != nil { + t.Fatalf("Getwd() error = %v", err) + } + if err := os.Chdir(dir); err != nil { + t.Fatalf("Chdir() error = %v", err) + } + t.Cleanup(func() { _ = os.Chdir(oldWD) }) + + svc, err := NewService(context.Background(), "testdb") + if err != nil { + t.Fatalf("NewService() error = %v", err) + } + t.Cleanup(func() { _ = svc.Close() }) + + inserted := svc.UpsertAnalyticItems(context.Background(), []AnalyticItem{{ID: 1}, {ID: 1}}) + if inserted != 1 { + t.Fatalf("expected 1 new insert, got %d", inserted) + } + + svc.SetCheckpoint(context.Background(), "bigquery", Checkpoint{ID: 10, Time: time.Unix(100, 0).UTC()}) + cp, ok := svc.GetCheckpoint(context.Background(), "bigquery") + if !ok { + t.Fatal("expected checkpoint to exist") + } + if cp.ID != 10 { + t.Fatalf("expected checkpoint id 10, got %d", cp.ID) + } +}