diff --git a/atom b/atom index f80b999..23d8ece 100755 --- a/atom +++ b/atom @@ -1204,100 +1204,185 @@ transcodeLauncher() { } worker() { + set +e while : do - if [ -f "$tempdir/worker-lock" ] + echo work + read line + if [[ $line == AtOM:Die ]] then - sleep 0.001 + break + elif [[ $line == AtOM:Sleep ]] + then + sleep 1 continue + fi + taskid=${line%%|*} + rest="${line#*|}|" + sourcefileid=${rest%%|*} + rest=${rest#*|} + required=${rest%%|*} + rest=${rest#*|} + commandline=${rest%%|*} + rest=${rest#*|} + cleanup=${rest%%|*} + rest=${rest#*|} + destfileid=${rest%%|*} + rest=${rest#*|} + destfilename=${rest%%|*} + rest=${rest#*|} + if eval $commandline >/dev/null 2>>"$tempdir/errors.log" + then + echo "finished $taskid|$sourcefileid|$destfileid|$destfilename" else - touch "$tempdir/worker-lock" - concurrency=$(Select worker_comm value <<<"parameter = concurrency") - active_workers=$(Select tasks 'count(*)' <<<"status = 1") - if (( active_workers < concurrency )) + echo "failed $taskid" + [ -n "$filename" ] \ + && eval rm -f $filename + fi + if [ -n "$cleanup" -a -n "$required" ] + then + echo "cleanup $required" + read answer + if (( answer == 1 )) then - echo ' -SELECT COUNT(*) -FROM tasks -WHERE status = 0 -AND requires is NULL; + eval rm -f $cleanup + fi + fi + done + return +} -SELECT - id, - source_file, - required, - command_line, - cleanup, - fileid, - filename -FROM tasks -WHERE status = 0 -AND requires is NULL -ORDER BY source_file -LIMIT 1; -' >&3 - read -u4 count - if (( count == 0 )) +master() { + for workerid in ${!workers[@]} + do + if read -t0.001 -u$((200+workerid)) workercommand workerquery + then + break + fi + done + (( ${#workers[@]} == 0 )) && break + if [ -n "$workercommand" ] + then + case $workercommand in + 'work') + if [ -n "$quit" ] then - rm "$tempdir/worker-lock" - exit + eval echo AtOM:Die '>&'$((100+workerid)) + wait ${workers[workerid]} + break + eval $((100+workerid))'>&-' + eval $((200+workerid))'<&-' + rm "$tempdir"/worker${workerid}{in,out} + unset workers[workerid] + elif (( active < concurrency )) + then + echo ' + SELECT COUNT(*) + FROM tasks + WHERE status = 0; + + SELECT COUNT(*) + FROM tasks + WHERE status = 0 + AND requires is NULL; + + SELECT + id, + source_file, + required, + command_line, + cleanup, + fileid, + filename + FROM tasks + WHERE status = 0 + AND requires is NULL + ORDER BY source_file + LIMIT 1; + ' >&3 + read -u4 remaining + read -u4 ready + if (( remaining == 0 )) + then + eval echo AtOM:Die '>&'$((100+workerid)) + wait ${workers[workerid]} + eval $((100+workerid))'>&-' + eval $((200+workerid))'<&-' + rm "$tempdir"/worker${workerid}{in,out} + unset workers[workerid] + continue + elif (( ready == 0 )) + then + line=AtOM:Sleep + else + (( ++active )) + read -u4 line + taskid=${line%%|*} + Update tasks status 1 <<<"id = $taskid" + fi + eval echo '"$line" >&'$((100+workerid)) + else + eval echo AtOM:Die '>&'$((100+workerid)) + wait ${workers[workerid]} + eval $((100+workerid))'>&-' + eval $((200+workerid))'<&-' + rm "$tempdir"/worker${workerid}{in,out} + unset workers[workerid] fi - read -u4 line - taskid=${line%%|*} - rest="${line#*|}|" + ;; + 'finished') + (( active-- )) || true + taskid=${workerquery%%|*} + rest="${workerquery#*|}|" sourcefileid=${rest%%|*} rest=${rest#*|} - required=${rest%%|*} - rest=${rest#*|} - commandline=${rest%%|*} - rest=${rest#*|} - cleanup=${rest%%|*} - rest=${rest#*|} destfileid=${rest%%|*} rest=${rest#*|} destfilename=${rest%%|*} - rest=${rest#*|} - Update tasks status 1 <<<"id = $taskid" - rm "$tempdir/worker-lock" - if eval $commandline 2>>"$tempdir/errors.log" + Delete tasks <<<"id = $taskid" + if [ -n "$destfilename" ] then - Delete tasks <<<"id = $taskid" - if [ -n "$destfilename" ] - then - echo \ - "UPDATE destination_files" \ - "SET filename=\"$destfilename\"," \ - " last_change=(" \ - " SELECT last_change" \ - " FROM source_files" \ - " WHERE id=$sourcefileid"\ - " )" \ - "WHERE id=$destfileid;" \ - >&3 - fi + echo \ + "UPDATE destination_files" \ + "SET filename=\"$destfilename\"," \ + " last_change=(" \ + " SELECT last_change" \ + " FROM source_files" \ + " WHERE id=$sourcefileid" \ + " )" \ + "WHERE id=$destfileid;" \ + >&3 + fi + ;; + 'failed') + (( --active )) || true + (( ++failed )) + taskid=$workerquery + faildepends=$( + Select tasks 'COUNT(*)' <<-EOWhere + requires = taskid + EOWhere + ) + (( failed+=faildepends )) + Update tasks status 2 <<<"id = $taskid" + Update tasks status 2 <<<"requires = $taskid" + ;; + 'cleanup') + required=$workerquery + echo "SELECT COUNT(*) + FROM tasks + WHERE ( status = 0 OR status = 1 ) + AND required = $required;">&3 + read -u4 count + if (( count == 0 )) + then + eval echo 1 '>&'$((100+workerid)) else - Update tasks status 2 <<<"id = $taskid" - [ -n "$filename" ] && rm -f "$filename" - [ -n "$cleanup" ] && rm -f "$cleanup" + eval echo 0 '>&'$((100+workerid)) fi - if [ -n "$cleanup" -a -n "$required" ] - then - echo "SELECT COUNT(*) - FROM tasks - WHERE ( status = 0 OR status = 1 ) - AND required = $required;">&3 - read -u4 count - if (( count == 0 )) - then - eval rm $cleanup - fi - fi - else - rm "$tempdir/worker-lock" - exit - fi - fi - done + ;; + esac + fi } #UI @@ -1491,12 +1576,6 @@ echo ' ); CREATE INDEX tasks_by_key ON tasks ( key ); CREATE INDEX tasks_by_sourcefile ON tasks ( source_file ); - - CREATE TEMPORARY TABLE worker_comm( - parameter TEXT UNIQUE NOT NULL, - value TEXT - ); - CREATE INDEX wrkcomm_by_param ON worker_comm ( parameter ); ' >&3 echo ' @@ -1686,10 +1765,68 @@ do done echo 'COMMIT;' >&3 echo -e "\rCreated ${count:-0} tasks for $filecount files" -unset count -InsertOrUpdate worker_comm value 1 <<<"parameter concurrency" -worker +rm -f "$tempdir"/worker* +concurrency=$(( maxload / 2 )) +active=0 +for (( i=0 ; i < concurrency ; i++ )) +do + (( ++wnum )) + mkfifo "$tempdir"/worker${wnum}{in,out} + worker $wnum <"$tempdir"/worker${wnum}in >"$tempdir"/worker${wnum}out & + workers[wnum]=$! + eval exec $((100+wnum))'>"$tempdir"/worker${wnum}in' + eval exec $((200+wnum))'<"$tempdir"/worker${wnum}out' +done +concurrencychange=$(date +%s) +taskcount=$count +while : +do + if read -n 1 -t 0.01 userinput + then + case $userinput in + '+') + ((maxload++)) + ;; + '-') + ((--maxload)) || ((maxload=1)) + ;; + [qQ]) + quit=1 + ;; + esac + fi + read humanload garbage < /proc/loadavg + load=${humanload%.*} + if [ -z "$quit" ] && (( $(date +%s)-concurrencychange >= loadinterval )) + then + if (( concurrency > 1 )) \ + && (( load > maxload )) + then + concurrencychange=$(date +%s) + (( --concurrency )) + elif (( load < maxload )) + then + concurrencychange=$(date +%s) + (( ++concurrency )) + (( ++wnum )) + mkfifo "$tempdir"/worker${wnum}{in,out} + worker $wnum \ + <"$tempdir"/worker${wnum}in \ + >"$tempdir"/worker${wnum}out & + workers[wnum]=$! + eval exec $((100+wnum))'>"$tempdir"/worker${wnum}in' + eval exec $((200+wnum))'<"$tempdir"/worker${wnum}out' + fi + fi + master + echo -en "\rload: $humanload / $maxload" \ + " workers: $active / $concurrency" \ + " done: $(( (taskcount - remaining ) * 100 / taskcount ))%" \ + " $((taskcount - remaining)) of $taskcount ($failed failed)" +done + +echo closeDatabase