package main import ( "crypto/sha1" "crypto/sha256" "encoding/hex" "errors" "fmt" "io" "os" "path/filepath" "strings" "sync" "github.com/go-resty/resty/v2" "github.com/schollz/progressbar/v3" photosapi "gitlab.celogeek.com/photos/api/internal/photos/api" ) type UploadCommand struct { Url string `short:"u" long:"url" description:"Url of the instance" required:"true"` Token string `short:"t" long:"token" description:"Token of the instance" required:"true"` File string `short:"f" long:"file" description:"File to upload" required:"true"` Workers uint32 `short:"w" long:"workers" description:"Number of workers for uploading chunks" default:"4"` } type UploadError struct { Err string `json:"error"` Details []string `json:"details"` } func (u *UploadError) Error() string { if len(u.Details) == 0 { return u.Err } return fmt.Sprintf("%s: \n - %s", u.Err, strings.Join(u.Details, "\n - ")) } type UploadCreate struct { UploadId string `json:"upload_id"` } type UploadPartResult struct { UploadId string `json:"upload_id"` Part uint `json:"part"` Size uint `json:"size"` PartSha256 string `json:"sha256"` } type UploadCompleteRequest struct { Sha256 string `json:"sha256" binding:"required,sha256"` Name string `json:"name" binding:"required"` Parts uint `json:"parts" binding:"required"` } type UploadFileRequest struct { Name string Checksum string Chunks []string } type UploadFileResponse struct { Sum string NbChunks uint32 Size uint64 } func (c *UploadCommand) Cli() *resty.Client { return resty.New().SetBaseURL(c.Url).SetAuthScheme("Private").SetAuthToken(c.Token) } func (c *UploadCommand) FileExists() (string, error) { f, err := os.Open(c.File) if err != nil { return "", err } defer f.Close() st, err := f.Stat() if err != nil { return "", err } progress := progressbar.DefaultBytes(st.Size(), fmt.Sprintf("Checking %s", filepath.Base(c.File))) defer progress.Close() tee := io.TeeReader(f, progress) checksum := sha1.New() io.Copy(checksum, tee) sum := hex.EncodeToString(checksum.Sum(nil)) resp, err := c.Cli().R().Head(fmt.Sprintf("/file/%s", sum)) if err != nil { return "", err } if resp.IsSuccess() { return "", errors.New("file already exists") } return sum, nil } func (c *UploadCommand) FileUpload(sum string) error { f, err := os.Open(c.File) if err != nil { return err } defer f.Close() st, err := f.Stat() if err != nil { return err } nbChunks := st.Size() / photosapi.CHUNK_SIZE if st.Size()%photosapi.CHUNK_SIZE > 0 { nbChunks++ } uploadFile := &UploadFileRequest{ Name: filepath.Base(c.File), Chunks: make([]string, nbChunks), Checksum: sum, } cli := c.Cli() progress := progressbar.DefaultBytes(st.Size(), fmt.Sprintf("Uploading %s", uploadFile.Name)) defer progress.Close() wg := sync.WaitGroup{} mu := sync.Mutex{} wg.Add(4) wgErrors := make([]error, c.Workers) i := int64(0) for w := uint32(0); w < c.Workers; w++ { go func(w uint32) { defer wg.Done() b := make([]byte, photosapi.CHUNK_SIZE) for { mu.Lock() part := i i++ n, err := f.Read(b) mu.Unlock() if n == 0 { if err == io.EOF { break } wgErrors[w] = err return } checksum := sha1.New() checksum.Write(b[0:n]) sum := hex.EncodeToString(checksum.Sum(nil)) resp, err := cli.R().Head(fmt.Sprintf("/file/chunk/%s", sum)) if err != nil { wgErrors[w] = err return } if resp.IsSuccess() { uploadFile.Chunks[part] = sum progress.Add(n) continue } resp, err = cli.R().SetError(&UploadError{}).SetBody(b[0:n]).Post("/file/chunk") if err != nil { wgErrors[w] = err return } if err, ok := resp.Error().(*UploadError); ok { wgErrors[w] = err return } uploadFile.Chunks[part] = sum progress.Add(n) } }(w) } wg.Wait() for _, err := range wgErrors { if err != nil { return err } } resp, err := cli.R().SetBody(uploadFile).SetError(&UploadError{}).SetResult(&UploadFileResponse{}).Post("/file") if err != nil { return err } if err, ok := resp.Error().(*UploadError); ok { logger.Println("Upload failed") return err } if result, ok := resp.Result().(*UploadFileResponse); ok { fmt.Printf("Upload succeed\nSum: %s\nNbChunks: %d\nSize: %d\n", result.Sum, result.NbChunks, result.Size) } return nil } func (c *UploadCommand) Execute(args []string) error { cli := c.Cli() resp, err := cli.R().SetError(&UploadError{}).SetResult(&UploadCreate{}).Post("/upload") if err != nil { return err } if err, ok := resp.Error().(*UploadError); ok { return err } uploadId := resp.Result().(*UploadCreate).UploadId f, err := os.Open(c.File) if err != nil { return err } defer f.Close() st, err := f.Stat() if err != nil { return err } progress := progressbar.DefaultBytes(st.Size(), fmt.Sprintf("Uploading %s", filepath.Base(c.File))) defer progress.Close() tee := io.TeeReader(f, progress) b := make([]byte, photosapi.MaxUploadPartSize) parts := 0 completesha256 := sha256.New() for { n, err := tee.Read(b) if err != nil { if err == io.EOF { break } else { return err } } parts++ partsha256 := sha256.New() partsha256.Write(b[:n]) completesha256.Write(b[:n]) resp, err := cli. R(). SetError(&UploadError{}). SetResult(&UploadPartResult{}). SetQueryParam("part", fmt.Sprint(parts)). SetQueryParam("sha256", hex.EncodeToString(partsha256.Sum(nil))). SetBody(b[:n]). SetPathParam("id", uploadId). Put("/upload/{id}") if err != nil { return err } if err, ok := resp.Error().(*UploadError); ok { return err } } fmt.Printf( "Upload: %s\nParts: %d\n", uploadId, parts, ) resp, err = cli. R(). SetError(&UploadError{}). SetPathParam("id", uploadId). SetBody(&UploadCompleteRequest{ Sha256: hex.EncodeToString(completesha256.Sum(nil)), Parts: uint(parts), Name: filepath.Base(c.File), }). Post("/upload/{id}") if err != nil { return err } if err, ok := resp.Error().(*UploadError); ok { return err } fmt.Printf("Response: %s\n", resp.Body()) cli.R().SetPathParam("id", uploadId).Delete("/upload/{id}") return nil } func init() { parser.AddCommand("upload", "Upload a file", "", &UploadCommand{}) }