Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stage local files to an OSDF origin #19

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
22 changes: 22 additions & 0 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,28 @@ builds:
goarch: ppc64le
- goos: darwin
goarch: ppc64le
- env:
- CGO_ENABLED=0
main: ./cmd/osdf_stage
goos:
- linux
- windows
- darwin
goarch:
- "amd64"
- "arm64"
- "ppc64le"
id: "osdf_stage"
binary: osdf_stage
tags:
- forceposix
ignore:
- goos: windows
goarch: arm64
- goos: windows
goarch: ppc64le
- goos: darwin
goarch: ppc64le

archives:
- id: osdf-client
Expand Down
229 changes: 229 additions & 0 deletions cmd/osdf_stage/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package main

import (
"fmt"
"net/url"
"os"
"path"
"regexp"
"strings"

"github.com/jessevdk/go-flags"
stashcp "github.com/htcondor/osdf-client/v6"
"github.com/htcondor/osdf-client/v6/classads"
log "github.com/sirupsen/logrus"
)

var (
version = "dev"
commit = "none"
date = "unknown"
builtBy = "unknown"
)

type Options struct {
// Turn on the debug logging
Debug bool `short:"d" long:"debug" description:"Turn on debug logging"`

// Token file to use for reading and/or writing
Token string `long:"token" short:"t" description:"Token file to use for reading and/or writing"`

// Version information
Version bool `long:"version" short:"v" description:"Print the version and exit"`

// Use the hook protocol
Hook bool `long:"hook" description:"Implement the HTCondor hook behavior"`

// Progress bars
ProgessBars bool `long:"progress" short:"p" description:"Show progress bars, turned on if run from a terminal"`

// Mount prefix; e.g., /mnt/stash/ospool/osgconnect
MountPrefix string `long:"mount" short:"m" description:"Prefix corresponding to the local mount point of the origin"`

// Origin prefix; e.g., osdf://ospool/osgconnect
OriginPrefix string `long:"origin-prefix" short:"o" description:"Prefix corresponding to the local origin"`

// Shadow origin prefix; e.g., osdf://ospool/osgconnect-shadow/
ShadowOriginPrefix string `long:"shadow-prefix" short:"s" description:"Prefix corresponding to the shadow origin" required:"true"`

// Sources to ingest
Sources []string `positional-arg-name:"sources" short:"i" long:"input" description:"Source file(s)" default:"-"`
}

var options Options

var parser = flags.NewParser(&options, flags.Default)

func main() {

// Capture the start time of the transfer
if _, err := parser.Parse(); err != nil {
if flagsErr, ok := err.(*flags.Error); ok && flagsErr.Type == flags.ErrHelp {
fmt.Fprintln(os.Stderr, `
This utility parses a job ClassAd and, for each "osdf://" URL found in
the input files that is in a locally-mounted origin, copies the file
over to a "shadow origin". The files in the shadow origin are given a
unique based on their last modification time; this means that local
files can be modified without causing cache consistency issues.

Terminology:
- Origin prefix: Where in the OSDF namespace the origin exports its
files. Example: osdf://osg-connect/protected
- Mount prefix: The location in the locally-mounted filesystem that
correspondings to the files in the origin prefix. Example:
/mnt/cephfs/protected
- Shadow prefix: Where in the OSDF namespace the resulting files should
be uploaded. Example: osdf://osg-connect-shadow/protected`);
os.Exit(0)
} else {
log.Errorln(err)
os.Exit(1)
}
}

if options.Debug {
// Set logging to debug level
err := setLogging(log.DebugLevel)
if err != nil {
log.Panicln("Failed to set logging level to Debug:", err)
}
} else {
err := setLogging(log.ErrorLevel)
if err != nil {
log.Panicln("Failed to set logging level to Error:", err)
}

}

originPrefixUri, err := url.Parse(options.OriginPrefix)
if err != nil {
log.Errorln("Origin prefix must be a URL (osdf://...):", err)
os.Exit(1)
}
if originPrefixUri.Scheme != "osdf" {
log.Errorln("Origin prefix scheme must be osdf://:", originPrefixUri.Scheme)
os.Exit(1)
}
originPrefixPath := path.Clean("/" + originPrefixUri.Host + "/" + originPrefixUri.Path)
log.Debugln("Local origin prefix:", originPrefixPath)

if options.Version {
fmt.Println("Version:", version)
fmt.Println("Build Date:", date)
fmt.Println("Build Commit:", commit)
fmt.Println("Built By:", builtBy)
os.Exit(0)
}

// Set the progress bars to the command line option
stashcp.Options.ProgressBars = options.ProgessBars
stashcp.Options.Token = options.Token

// Check if the program was executed from a terminal
// https://rosettacode.org/wiki/Check_output_device_is_a_terminal#Go
if fileInfo, _ := os.Stdout.Stat(); (fileInfo.Mode() & os.ModeCharDevice) != 0 {
stashcp.Options.ProgressBars = true
} else {
stashcp.Options.ProgressBars = false
}

var sources []string
var extraSources []string
if options.Hook {
buffer := make([]byte, 100*1024)
bytesread, err := os.Stdin.Read(buffer)
if err != nil {
log.Errorln("Failed to read ClassAd from stdin:", err)
os.Exit(1)
}
classad, err := classads.ParseClassAd(string(buffer[:bytesread]))
if err != nil {
log.Errorln("Failed to parse ClassAd from stdin: ", err)
os.Exit(1)
}
inputList, err := classad.Get("TransferInput")
if err != nil || inputList == nil {
// No TransferInput, no need to transform...
os.Exit(0)
}
inputListStr, ok := inputList.(string)
if !ok {
log.Errorln("TransferInput is not a string")
os.Exit(1)
}
re := regexp.MustCompile(`[,\s]+`)
for _, source := range re.Split(inputListStr, -1) {
log.Debugln("Examining transfer input file", source)
if (strings.HasPrefix(source, options.MountPrefix)) {
sources = append(sources, source)
} else {
// Replace the osdf:// prefix with the local mount path
source_uri, err := url.Parse(source)
source_uri_scheme := strings.SplitN(source_uri.Scheme, "+", 2)[0]
if err == nil && source_uri_scheme == "osdf" {
source_path := path.Clean("/" + source_uri.Host + "/" + source_uri.Path)
if (strings.HasPrefix(source_path, originPrefixPath)) {
sources = append(sources, options.MountPrefix + source_path[len(originPrefixPath):])
continue
}
}
extraSources = append(extraSources, source)
}
}
} else {
log.Debugln("Len of source:", len(options.Sources))
if len(options.Sources) < 1 {
log.Errorln("No ingest sources")
parser.WriteHelp(os.Stdout)
os.Exit(1)
}
sources = options.Sources
}
log.Debugln("Sources:", sources)

var result error
var xformSources []string
for _, src := range sources {
_, newSource, result := stashcp.DoShadowIngest(src, options.MountPrefix, options.ShadowOriginPrefix)
if result != nil {
// What's the correct behavior on failure? For now, we silently put the transfer
// back on the original list. This is arguably the wrong approach as it might
// give the user surprising semantics -- but keeping this until we have a bit more
// confidence in the approach.
extraSources = append(extraSources, src)
log.Errorf("Failed to ingest %s: %s. Adding original back to the transfer list",
src, result.Error())
continue
}
xformSources = append(xformSources, newSource)
}

// Exit with failure
if result != nil {
// Print the list of errors
log.Errorln(stashcp.GetErrors())
if stashcp.ErrorsRetryable() {
log.Errorln("Errors are retryable")
os.Exit(11)
}
os.Exit(1)
}
if options.Hook {
inputsStr := strings.Join(extraSources, ", ")
if len(extraSources) > 0 && len(xformSources) > 0 {
inputsStr = inputsStr + ", " + strings.Join(xformSources, ", ")
} else if len(xformSources) > 0 {
inputsStr = strings.Join(xformSources, ", ")
}
fmt.Printf("TransferInput = \"%s\"", inputsStr)
}
}

func setLogging(logLevel log.Level) error {
textFormatter := log.TextFormatter{}
textFormatter.DisableLevelTruncation = true
textFormatter.FullTimestamp = true
log.SetFormatter(&textFormatter)
log.SetLevel(logLevel)
return nil
}
89 changes: 89 additions & 0 deletions handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ func (e *StoppedTransferError) Error() string {
}


type HttpErrResp struct {
Code int
Err string
}

func (e *HttpErrResp) Error() string {
return e.Err
}


// SlowTransferError is an error that is returned when a transfer takes longer than the configured timeout
type SlowTransferError struct {
BytesTransferred int64
Expand Down Expand Up @@ -923,3 +933,82 @@ func walkDir(path string, client *gowebdav.Client) ([]string, error) {
}
return files, nil
}

func StatHttp(dest *url.URL, namespace Namespace) (uint64, error) {

scitoken_contents, err := getToken(dest, namespace, false, "")
if err != nil {
return 0, err
}

// Parse the writeback host as a URL
writebackhostUrl, err := url.Parse(namespace.WriteBackHost)
if err != nil {
return 0, err
}
dest.Host = writebackhostUrl.Host
dest.Scheme = "https"

canDisableProxy := CanDisableProxy()
disableProxy := !IsProxyEnabled()

var resp *http.Response
for {
defaultTransport := http.DefaultTransport.(*http.Transport).Clone()
if disableProxy {
log.Debugln("Performing HEAD (without proxy)", dest.String())
defaultTransport.Proxy = nil
} else {
log.Debugln("Performing HEAD", dest.String())
}

client := &http.Client{Transport: defaultTransport}
req, err := http.NewRequest("HEAD", dest.String(), nil)
if err != nil {
log.Errorln("Failed to create HTTP request:", err)
return 0, err
}

if scitoken_contents != "" {
req.Header.Set("Authorization", "Bearer " + scitoken_contents)
}

resp, err = client.Do(req)
if err == nil {
break
}
if urle, ok := err.(*url.Error); canDisableProxy && !disableProxy && ok && urle.Unwrap() != nil {
if ope, ok := urle.Unwrap().(*net.OpError); ok && ope.Op == "proxyconnect" {
log.Warnln("Failed to connect to proxy; will retry without:", ope)
disableProxy = true
continue
}
}
log.Errorln("Failed to get HTTP response:", err)
return 0, err
}

if resp.StatusCode == 200 {
defer resp.Body.Close()
contentLengthStr := resp.Header.Get("Content-Length")
if len(contentLengthStr) == 0 {
log.Errorln("HEAD response did not include Content-Length header")
return 0, errors.New("HEAD response did not include Content-Length header")
}
contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
if err != nil {
log.Errorf("Unable to parse Content-Length header value (%s) as integer: %s", contentLengthStr, err)
return 0, err
}
return uint64(contentLength), nil
} else {
response_b, err := io.ReadAll(resp.Body)
if err != nil {
log.Errorln("Failed to read error message:", err)
return 0, err
}
defer resp.Body.Close()
return 0, &HttpErrResp{resp.StatusCode, fmt.Sprintf("Request failed (HTTP status %d): %s", resp.StatusCode, string(response_b))}
}
}

Loading