diff --git a/cmd/photos-api-cli/upload.go b/cmd/photos-api-cli/upload.go index 4430cf9..9dea438 100644 --- a/cmd/photos-api-cli/upload.go +++ b/cmd/photos-api-cli/upload.go @@ -8,25 +8,23 @@ import ( "io" "os" "path/filepath" + "sync" "github.com/go-resty/resty/v2" "github.com/schollz/progressbar/v3" ) type UploadCommand struct { - Url string `short:"u" long:"url" description:"Url of the instance" required:"true"` - Token string `short:"t" long:"token" description:"Token of the instance" required:"true"` - File string `short:"f" long:"file" description:"File to upload" required:"true"` + Url string `short:"u" long:"url" description:"Url of the instance" required:"true"` + Token string `short:"t" long:"token" description:"Token of the instance" required:"true"` + File string `short:"f" long:"file" description:"File to upload" required:"true"` + Workers uint32 `short:"w" long:"workers" description:"Number of workers for uploading chunks" default:"4"` } type UploadError struct { Error string } -type UploadChunkSuccess struct { - Checksum string -} - type UploadFileRequest struct { Name string Checksum string @@ -39,7 +37,45 @@ type UploadFileResponse struct { Size uint64 } -func (c *UploadCommand) Execute(args []string) error { +func (c *UploadCommand) Cli() *resty.Client { + return resty.New().SetBaseURL(c.Url).SetAuthScheme("Private").SetAuthToken(c.Token) +} + +func (c *UploadCommand) FileExists() (string, error) { + f, err := os.Open(c.File) + if err != nil { + return "", err + } + defer f.Close() + + st, err := f.Stat() + if err != nil { + return "", err + } + chunkSize := int64(1 << 20) + nbChunks := st.Size() / chunkSize + if st.Size()%chunkSize > 0 { + nbChunks++ + } + + progress := progressbar.DefaultBytes(st.Size(), fmt.Sprintf("Checking %s", filepath.Base(c.File))) + defer progress.Close() + tee := io.TeeReader(f, progress) + checksum := sha1.New() + io.Copy(checksum, tee) + sum := hex.EncodeToString(checksum.Sum(nil)) + + resp, err := c.Cli().R().Head(fmt.Sprintf("/file/%s", sum)) + if err != nil { + return "", err + } + if resp.IsSuccess() { + return "", errors.New("file already exists") + } + return sum, nil +} + +func (c *UploadCommand) FileUpload(sum string) error { f, err := os.Open(c.File) if err != nil { return err @@ -56,40 +92,78 @@ func (c *UploadCommand) Execute(args []string) error { nbChunks++ } - uploadFile := &UploadFileRequest{Name: filepath.Base(c.File), Chunks: make([]string, nbChunks)} + uploadFile := &UploadFileRequest{ + Name: filepath.Base(c.File), + Chunks: make([]string, nbChunks), + Checksum: sum, + } + + cli := c.Cli() progress := progressbar.DefaultBytes(st.Size(), fmt.Sprintf("Uploading %s", uploadFile.Name)) defer progress.Close() - cli := resty.New().SetBaseURL(c.Url).SetAuthScheme("Private").SetAuthToken(c.Token) - b := make([]byte, chunkSize) - checksum := sha1.New() - for i := 0; ; i++ { - n, err := f.Read(b) - if n == 0 { - if err == io.EOF { - break - } - return err - } + wg := sync.WaitGroup{} + mu := sync.Mutex{} + wg.Add(4) + wgErrors := make([]error, c.Workers) + i := int64(0) + for w := uint32(0); w < c.Workers; w++ { + go func(w uint32) { + defer wg.Done() + b := make([]byte, chunkSize) + for { + mu.Lock() + part := i + i++ + n, err := f.Read(b) + mu.Unlock() + if n == 0 { + if err == io.EOF { + break + } + wgErrors[w] = err + return + } - resp, err := cli.R().SetError(&UploadError{}).SetResult(&UploadChunkSuccess{}).SetBody(b[0:n]).Post("/file/chunk") + checksum := sha1.New() + checksum.Write(b[0:n]) + sum := hex.EncodeToString(checksum.Sum(nil)) + + resp, err := cli.R().Head(fmt.Sprintf("/file/chunk/%s", sum)) + if err != nil { + wgErrors[w] = err + return + } + + if resp.IsSuccess() { + uploadFile.Chunks[part] = sum + progress.Add(n) + continue + } + + resp, err = cli.R().SetError(&UploadError{}).SetBody(b[0:n]).Post("/file/chunk") + if err != nil { + wgErrors[w] = err + return + } + + if err, ok := resp.Error().(*UploadError); ok { + wgErrors[w] = errors.New(err.Error) + return + } + + uploadFile.Chunks[part] = sum + progress.Add(n) + } + }(w) + } + wg.Wait() + for _, err := range wgErrors { if err != nil { return err } - - if err, ok := resp.Error().(*UploadError); ok { - return errors.New(err.Error) - } - - if result, ok := resp.Result().(*UploadChunkSuccess); ok { - uploadFile.Chunks[i] = result.Checksum - checksum.Write(b[0:n]) - progress.Add(n) - } } - uploadFile.Checksum = hex.EncodeToString(checksum.Sum(nil)) - resp, err := cli.R().SetBody(uploadFile).SetError(&UploadError{}).SetResult(&UploadFileResponse{}).Post("/file") if err != nil { return err @@ -107,6 +181,14 @@ func (c *UploadCommand) Execute(args []string) error { return nil } +func (c *UploadCommand) Execute(args []string) error { + sum, err := c.FileExists() + if err != nil { + return err + } + return c.FileUpload(sum) +} + func init() { parser.AddCommand("upload", "Upload a file", "", &UploadCommand{}) } diff --git a/internal/photos/api/file.go b/internal/photos/api/file.go index ac51f93..c495ef6 100644 --- a/internal/photos/api/file.go +++ b/internal/photos/api/file.go @@ -144,6 +144,20 @@ func (s *Service) FileCreateChunk(c *gin.Context) { }) } +func (s *Service) FileChunkExists(c *gin.Context) { + checksum := c.Param("checksum") + if len(checksum) != 40 { + s.Error(c, http.StatusBadRequest, photoserrors.ErrStoreBadChecksum) + return + } + + if s.Store.ChunkExists(checksum) { + c.Status(http.StatusOK) + } else { + c.Status(http.StatusNotFound) + } +} + func (s *Service) FileExists(c *gin.Context) { checksum := c.Param("checksum") if len(checksum) != 40 { diff --git a/internal/photos/api/main.go b/internal/photos/api/main.go index 684eb10..efaed89 100644 --- a/internal/photos/api/main.go +++ b/internal/photos/api/main.go @@ -60,9 +60,11 @@ func (s *Service) SetupRoutes() { album := s.Gin.Group("/file") album.Use(s.RequireSession) album.POST("", s.FileCreate) - album.POST("/chunk", s.FileCreateChunk) album.HEAD("/:checksum", s.FileExists) + album.POST("/chunk", s.FileCreateChunk) + album.HEAD("/chunk/:checksum", s.FileChunkExists) + s.Gin.NoRoute(func(c *gin.Context) { s.Error(c, http.StatusNotFound, photoserrors.ErrReqNotFound) }) diff --git a/internal/store/core.go b/internal/store/core.go index 7c00941..e7e8bf1 100644 --- a/internal/store/core.go +++ b/internal/store/core.go @@ -46,6 +46,11 @@ func (s *Store) LoadChunk(sum string) (*Chunk, error) { return c, nil } +func (s *Store) ChunkExists(sum string) bool { + c := &Chunk{s, sum, nil} + return c.FileExists() +} + func (c *Chunk) Dir() string { return filepath.Join(c.Path, "storage", c.Sum[0:1], c.Sum[1:2]) }