parallel checks

This commit is contained in:
Celogeek 2021-12-27 16:45:34 +01:00
parent 0981f35615
commit 131f7038a9
Signed by: celogeek
GPG Key ID: E6B7BDCFC446233A
4 changed files with 105 additions and 68 deletions

View File

@ -30,15 +30,21 @@ func (s *FileToUploadStat) Check() {
s.mu.Unlock() s.mu.Unlock()
} }
func (s *FileToUploadStat) Add(filesize int64) { func (s *FileToUploadStat) AddBytes(filesize int64) {
s.mu.Lock() s.mu.Lock()
s.Total++
s.TotalBytes += filesize s.TotalBytes += filesize
s.Progress.ChangeMax64(s.TotalBytes) s.Progress.ChangeMax64(s.TotalBytes)
s.Refresh() s.Refresh()
s.mu.Unlock() s.mu.Unlock()
} }
func (s *FileToUploadStat) Add() {
s.mu.Lock()
s.Total++
s.Refresh()
s.mu.Unlock()
}
func (s *FileToUploadStat) Commit(filereaded int64) { func (s *FileToUploadStat) Commit(filereaded int64) {
s.mu.Lock() s.mu.Lock()
s.UploadedBytes += filereaded s.UploadedBytes += filereaded
@ -60,6 +66,7 @@ func (s *FileToUploadStat) Close() {
func (s *FileToUploadStat) Fail() { func (s *FileToUploadStat) Fail() {
s.mu.Lock() s.mu.Lock()
s.Failed++ s.Failed++
s.Checked++
s.Refresh() s.Refresh()
s.mu.Unlock() s.mu.Unlock()
} }
@ -67,6 +74,7 @@ func (s *FileToUploadStat) Fail() {
func (s *FileToUploadStat) Skip() { func (s *FileToUploadStat) Skip() {
s.mu.Lock() s.mu.Lock()
s.Skipped++ s.Skipped++
s.Checked++
s.Refresh() s.Refresh()
s.mu.Unlock() s.mu.Unlock()
} }

View File

@ -24,20 +24,29 @@ func (p *Piwigo) FileExists(md5 string) bool {
return resp[md5] != nil return resp[md5] != nil
} }
func (p *Piwigo) Upload(file *FileToUpload, stat *FileToUploadStat, nbJobs int, hasVideoJS bool) error { func (p *Piwigo) CheckUploadFile(file *FileToUpload, stat *FileToUploadStat) error {
if !file.Checked() { if !file.Checked() {
if file.MD5() == "" { if file.MD5() == "" {
stat.Fail() stat.Fail()
return errors.New("checksum error") return errors.New("checksum error")
} }
if p.FileExists(file.MD5()) {
stat.Skip()
return errors.New("file already exists")
}
stat.Check() stat.Check()
stat.AddBytes(file.Size())
} }
return nil
}
if p.FileExists(file.MD5()) { func (p *Piwigo) Upload(file *FileToUpload, stat *FileToUploadStat, nbJobs int, hasVideoJS bool) error {
stat.Skip() err := p.CheckUploadFile(file, stat)
return errors.New("file already exists") if err != nil {
return err
} }
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
chunks, err := Base64Chunker(file.FullPath()) chunks, err := Base64Chunker(file.FullPath())
errout := make(chan error) errout := make(chan error)
@ -116,72 +125,88 @@ func (p *Piwigo) UploadChunk(md5 string, chunks chan *Base64ChunkResult, errout
} }
} }
func (p *Piwigo) UploadTree(rootPath string, parentCategoryId int, level int, filter UploadFileType) ([]FileToUpload, error) { func (p *Piwigo) ScanTree(
rootPath, err := filepath.Abs(rootPath) rootPath string,
parentCategoryId int,
level int,
filter *UploadFileType,
stat *FileToUploadStat,
files chan *FileToUpload,
) (err error) {
if level == 0 {
defer close(files)
}
rootPath, err = filepath.Abs(rootPath)
if err != nil { if err != nil {
return nil, err return
} }
categoriesId, err := p.CategoriesId(parentCategoryId) categoriesId, err := p.CategoriesId(parentCategoryId)
if err != nil { if err != nil {
return nil, err return
} }
dirs, err := ioutil.ReadDir(rootPath) dirs, err := ioutil.ReadDir(rootPath)
if err != nil { if err != nil {
return nil, err return
} }
var files []FileToUpload
levelStr := strings.Repeat(" ", level)
for _, dir := range dirs { for _, dir := range dirs {
if !dir.IsDir() { switch dir.IsDir() {
ext := strings.ToLower(filepath.Ext(dir.Name())[1:]) case true: // Directory
if !filter.Has(ext) { dirname := norm.NFC.String(dir.Name())
categoryId, ok := categoriesId[dirname]
if !ok {
var resp struct {
Id int `json:"id"`
}
err = p.Post("pwg.categories.add", &url.Values{
"name": []string{strings.ReplaceAll(dirname, "'", `\'`)},
"parent": []string{fmt.Sprint(parentCategoryId)},
}, &resp)
if err != nil {
return
}
categoryId = resp.Id
}
err = p.ScanTree(filepath.Join(rootPath, dirname), categoryId, level+1, filter, stat, files)
if err != nil {
return
}
case false: // File
file := &FileToUpload{
Dir: rootPath,
Name: dir.Name(),
CategoryId: parentCategoryId,
}
if !filter.Has(file.Ext()) {
continue continue
} }
filename := filepath.Join(rootPath, dir.Name()) stat.Add()
md5, err := Md5File(filename) files <- file
if err != nil {
return nil, err
}
status := "OK"
if p.FileExists(md5) {
status = "SKIP"
}
fmt.Printf("%s - %s %s - %s\n", levelStr, dir.Name(), md5, status)
if status == "OK" {
files = append(files, FileToUpload{
Dir: rootPath,
Name: dir.Name(),
CategoryId: parentCategoryId,
})
}
continue
} }
dirname := norm.NFC.String(dir.Name())
categoryId, ok := categoriesId[dirname]
fmt.Printf("%s%s\n", levelStr, dirname)
if !ok {
var resp struct {
Id int `json:"id"`
}
err = p.Post("pwg.categories.add", &url.Values{
"name": []string{strings.ReplaceAll(dirname, "'", `\'`)},
"parent": []string{fmt.Sprint(parentCategoryId)},
}, &resp)
if err != nil {
return nil, err
}
categoryId = resp.Id
}
newFiles, err := p.UploadTree(filepath.Join(rootPath, dirname), categoryId, level+1, filter)
if err != nil {
return nil, err
}
files = append(files, newFiles...)
} }
return files, nil return nil
}
func (p *Piwigo) CheckFiles(filesToCheck chan *FileToUpload, files chan *FileToUpload, stat *FileToUploadStat, nbJobs int) {
defer close(files)
wgChecker := &sync.WaitGroup{}
for i := 0; i < nbJobs; i++ {
wgChecker.Add(1)
go func() {
defer wgChecker.Done()
for file := range filesToCheck {
err := p.CheckUploadFile(file, stat)
if err != nil {
continue
}
files <- file
}
}()
}
wgChecker.Wait()
} }

View File

@ -40,12 +40,10 @@ func (c *ImagesUploadCommand) Execute(args []string) error {
} }
stat := &piwigo.FileToUploadStat{ stat := &piwigo.FileToUploadStat{
Progress: progressbar.DefaultBytes(1, "prepare"), Progress: progressbar.DefaultBytes(1, "..."),
} }
defer stat.Close() defer stat.Close()
stat.Add(file.Size()) stat.Add()
stat.Refresh()
err = p.Upload(file, stat, c.NbJobs, hasVideoJS) err = p.Upload(file, stat, c.NbJobs, hasVideoJS)
if err != nil { if err != nil {
return err return err

View File

@ -1,9 +1,8 @@
package piwigocli package piwigocli
import ( import (
"fmt"
"github.com/celogeek/piwigo-cli/internal/piwigo" "github.com/celogeek/piwigo-cli/internal/piwigo"
"github.com/schollz/progressbar/v3"
) )
type ImagesUploadTreeCommand struct { type ImagesUploadTreeCommand struct {
@ -23,11 +22,18 @@ func (c *ImagesUploadTreeCommand) Execute(args []string) error {
return err return err
} }
files, err := p.UploadTree(c.Dirname, c.CategoryId, 0, status.UploadFileType) stat := &piwigo.FileToUploadStat{
if err != nil { Progress: progressbar.DefaultBytes(1, "..."),
return err }
defer stat.Close()
filesToCheck := make(chan *piwigo.FileToUpload, 1000)
files := make(chan *piwigo.FileToUpload, 1000)
go p.ScanTree(c.Dirname, c.CategoryId, 0, &status.UploadFileType, stat, filesToCheck)
go p.CheckFiles(filesToCheck, files, stat, 8)
for range files {
} }
fmt.Println("Total", len(files))
return nil return nil
} }