Skip to content

Commit

Permalink
feat: implement go routines to speed up process (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
genos1998 authored Nov 27, 2024
1 parent 9078ff0 commit aa27036
Showing 1 changed file with 78 additions and 50 deletions.
128 changes: 78 additions & 50 deletions september2024october2024/license.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"sort"
"strings"
"sync"
"time"
graphqlfunc "upgradationScript/graphqlFunc"
"upgradationScript/logger"
Expand Down Expand Up @@ -151,6 +152,56 @@ func extractFileName(rawURL string) (string, error) {
return query.Get("fileName"), nil
}

func processFile(
ctx context.Context,
fileName string,
s3Client *graphqlfunc.S3Client,
gqlient graphql.Client,
bucketName, keyPrefix string,
) error {
logger.Sl.Debug("Processing file:", fileName)

key := s3Client.MakeS3Key(bucketName, keyPrefix, fileName)
if !s3Client.ObjectExists(ctx, bucketName, key) {
return fmt.Errorf("file not found: %s", key)
}

savefile, err := os.Create(fileName)
if err != nil {
return fmt.Errorf("os.Create: error: %s", err)
}
defer os.Remove(fileName)

if err := s3Client.Download(ctx, bucketName, key, savefile); err != nil {
return fmt.Errorf("unable to download file: %s, error: %s", key, err.Error())
}
savefile.Close()

data, err := os.ReadFile(fileName)
if err != nil {
return fmt.Errorf("os.ReadFile: error: %s", err.Error())
}

var dx CycloneDx
if err := json.Unmarshal(data, &dx); err != nil {
return fmt.Errorf("json.Unmarshal: error: %s", err.Error())
}

componentPointers, err := translateComponents(dx.Components)
if err != nil {
return fmt.Errorf("translateComponents: error: %s", err.Error())
}

for _, v := range componentPointers {
if _, err := updateComponentLicense(ctx, gqlient, v.Id, v.Licenses); err != nil {
return fmt.Errorf("updateComponentLicense: id: %s licenses: %v error: %s", v.Id, v.Licenses, err.Error())
}
}

logger.Sl.Debug("Completed processing file:", fileName)
return nil
}

func ingestLicenses(gqlient graphql.Client) error {

logger.Sl.Debugf("-----ingesting licenses from sboms--------")
Expand Down Expand Up @@ -238,59 +289,36 @@ func ingestLicenses(gqlient graphql.Client) error {
return fmt.Errorf("ingestLicenses: MakeS3Client: err: %s", err.Error())
}

s3Client := *makeS3client

i := 0

for fileName := range fileNameToComponents {
logger.Logger.Debug("=============")
logger.Logger.Sugar().Debug("iteration:", i, " of total:", len(fileNameToComponents))

i++

key := s3Client.MakeS3Key("ssd-temporal", "sbom", fileName)
found := s3Client.ObjectExists(context.Background(), "ssd-temporal", key)
if !found {
return fmt.Errorf("ingestLicenses: file not found")
}

savefile, err := os.Create(fileName)
if err != nil {
return fmt.Errorf("ingestLicenses: os.Create: fileName: %s error: %s", fileName, err.Error())
}

defer os.Remove(fileName)

if err := s3Client.Download(context.Background(), "ssd-temporal", key, savefile); err != nil {
return fmt.Errorf("ingestLicenses: unable to download file key: %s error: %s", key, err.Error())
}

savefile.Close()

data, err := os.ReadFile(fileName)
if err != nil {
return fmt.Errorf("ingestLicenses: os.ReadFile: fileName: %s error: %s", fileName, err.Error())
}

var dx CycloneDx
if err := json.Unmarshal(data, &dx); err != nil {
return fmt.Errorf("ingestLicenses: sbom json.Unmarshal: error: %s", err.Error())
}

// Convert all components over to our internal representation, and index these
// by the bom reference, so we can look them up quickly later.
componentPointers, err := translateComponents(dx.Components)
if err != nil {
return fmt.Errorf("ingestLicenses: translateComponents: error: %s", err.Error())
}

for _, v := range componentPointers {
if _, err := updateComponentLicense(context.Background(), gqlient, v.Id, v.Licenses); err != nil {
return fmt.Errorf("ingestLicenses: updateComponentLicense: id: %s licenses: %v error: %s", v.Id, v.Licenses, err.Error())
const numWorkers = 20
fileChan := make(chan string, len(fileNameToComponents))
var wg sync.WaitGroup

processed := 0
totalFiles := len(fileNameToComponents)

// Worker pool
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for fileName := range fileChan {
if err := processFile(context.Background(), fileName, makeS3client, gqlient, "ssd-temporal", "sbom"); err != nil {
logger.Logger.Sugar().Errorf("error processing file %s: %v", fileName, err.Error())
}
processed++
logger.Logger.Sugar().Debugf("no. of file processed: %v outOfFiles: %v", processed, totalFiles)
}
}
}()
}

// Enqueue tasks
for fileName := range fileNameToComponents {
fileChan <- fileName
}
close(fileChan)

// Wait for all workers to finish
wg.Wait()

logger.Sl.Debugf("-----ingested licenses from sboms--------")

Expand Down

0 comments on commit aa27036

Please sign in to comment.