stream processing

This commit is contained in:
Celogeek 2022-12-27 13:17:02 +01:00
parent 4dbc7b5f8a
commit eaba40018a
Signed by: celogeek
GPG Key ID: E6B7BDCFC446233A
2 changed files with 70 additions and 47 deletions

View File

@ -19,14 +19,18 @@ import (
imageconverter "go-comic-converter/internal/image-converter"
)
type Images struct {
Id int
Title string
type ImageDetails struct {
*Images
Data string
Width int
Height int
}
type Images struct {
Id int
Title string
}
type EPub struct {
Path string
@ -42,6 +46,11 @@ type EPub struct {
Images []*Images
FirstImageTitle string
Error error
Processors func()
ProcessorsCount int
ProcessorsWG *sync.WaitGroup
ProcessorsResult []chan *ImageDetails
}
func NewEpub(path string) *EPub {
@ -49,6 +58,13 @@ func NewEpub(path string) *EPub {
if err != nil {
panic(err)
}
nbcpu := runtime.NumCPU()
results := make([]chan *ImageDetails, nbcpu)
for r := range results {
results[r] = make(chan *ImageDetails)
}
return &EPub{
Path: path,
@ -60,6 +76,10 @@ func NewEpub(path string) *EPub {
ViewWidth: 0,
ViewHeight: 0,
Quality: 75,
ProcessorsCount: nbcpu,
ProcessorsWG: &sync.WaitGroup{},
ProcessorsResult: results,
}
}
@ -138,51 +158,43 @@ func (e *EPub) LoadDir(dirname string) *EPub {
titleFormat := fmt.Sprintf("%%0%dd", len(fmt.Sprint(len(images)-1)))
wg := &sync.WaitGroup{}
wg.Add(runtime.NumCPU())
for i := range images {
e.Images = append(e.Images, &Images{
Id: i,
Title: fmt.Sprintf(titleFormat, i),
})
}
type todoStruct struct {
Id int
type Todo struct {
*Images
Path string
}
type resultStruct struct {
Id int
Data string
Width int
Height int
todo := make([]chan *Todo, e.ProcessorsCount)
for i := range todo {
todo[i] = make(chan *Todo)
}
todo := make(chan *todoStruct)
result := make(chan *resultStruct)
for i := 0; i < runtime.NumCPU(); i++ {
go func() {
defer wg.Done()
for task := range todo {
e.Processors = func() {
for i := 0; i < e.ProcessorsCount; i++ {
e.ProcessorsWG.Add(1)
go func(ticket int) {
defer e.ProcessorsWG.Done()
defer close(e.ProcessorsResult[ticket])
for task := range todo[ticket] {
data, w, h := imageconverter.Convert(task.Path, true, e.ViewWidth, e.ViewHeight, e.Quality)
result <- &resultStruct{task.Id, data, w, h}
e.ProcessorsResult[ticket] <- &ImageDetails{task.Images, data, w, h}
}
}()
}(i % e.ProcessorsCount)
}
go func() {
for id, path := range images {
todo <- &todoStruct{id, path}
for i, path := range images {
todo[i%e.ProcessorsCount] <- &Todo{e.Images[i], path}
}
for i := range todo {
close(todo[i])
}
close(todo)
wg.Wait()
close(result)
}()
e.Images = make([]*Images, len(images))
for res := range result {
fmt.Printf("%d done\n", res.Id)
e.Images[res.Id] = &Images{
Id: res.Id,
Title: fmt.Sprintf(titleFormat, res.Id),
Data: res.Data,
Width: res.Width,
Height: res.Height,
}
}
e.FirstImageTitle = e.Images[0].Title
@ -207,12 +219,6 @@ func (e *EPub) Write() error {
{"OEBPS/nav.xhtml", e.Render(TEMPLATE_NAV, e)},
{"OEBPS/Text/style.css", TEMPLATE_STYLE},
}
for _, img := range e.Images {
text := fmt.Sprintf("OEBPS/Text/%s.xhtml", img.Title)
image := fmt.Sprintf("OEBPS/Images/%s.jpg", img.Title)
zipContent = append(zipContent, []string{text, e.Render(TEMPLATE_TEXT, img)})
zipContent = append(zipContent, []string{image, img.Data})
}
wz := zip.NewWriter(w)
defer wz.Close()
@ -221,5 +227,22 @@ func (e *EPub) Write() error {
return err
}
}
e.Processors()
for i, img := range e.Images {
fmt.Printf("%03d/%03d\n", i+1, len(e.Images))
text := fmt.Sprintf("OEBPS/Text/%s.xhtml", img.Title)
image := fmt.Sprintf("OEBPS/Images/%s.jpg", img.Title)
details := <-e.ProcessorsResult[i%e.ProcessorsCount]
if err := e.WriteFile(wz, text, e.Render(TEMPLATE_TEXT, details)); err != nil {
return err
}
if err := e.WriteFile(wz, image, details.Data); err != nil {
return err
}
}
e.ProcessorsWG.Wait()
return nil
}

View File

@ -89,7 +89,7 @@ func main() {
err := epub.NewEpub(opt.Output).
SetSize(profile.Width, profile.Height).
SetQuality(opt.Quality).
SetTitle(opt.Input).
SetTitle(opt.Title).
SetAuthor(opt.Author).
LoadDir(opt.Input).
Write()