Removed older versions

This commit is contained in:
Akash Bora
2022-07-18 17:04:47 +05:30
committed by GitHub
parent 1d45e22e55
commit 816d1150bd
25 changed files with 0 additions and 2375 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 18 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 213 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.5 KiB

File diff suppressed because it is too large Load Diff

View File

@@ -1,24 +0,0 @@
#Read this to use the python version.
This version is comparitively lower in size and works the same inside python environment.
Things to setup before running the python file:
The GUI of Datamosher-Pro.py given in this source code version is optimised for Windows; for other OS, download the python version from release page.
1) First setup ffglitch if you are using this repo only. (See readme.txt inside ffglitch folder to download ffglitch)
(Make sure you make it Unix executable if you are on Mac and also give permission)
Note: No need to setup the ffglitch if you downloaded the python version from release page as all the versions are pre-provided there
2) After doing that, run the datamosher-pro.py file and click yes if module error pops up and let the modules get downloaded.
If it doesn't work then install the modules manually-
# Modules you need to install:
-- tkinter
If not installed then use-
(pip install tk) or (sudo apt-get install python3-tk)
-- numpy
(pip install numpy)
-- imageio
(pip install imageio)
-- imageio-ffmpeg
(pip install imageio-ffmpeg)
# For Mac, you need to install one more module:
-- tkmacosx
(pip install tkmacosx)
3) Do not delete or move any assets or folder that are linked with the main file or else it will show error.
After the setup the application will open and you can try datamoshing.

View File

@@ -1,11 +0,0 @@
All ffglitch builds are given in the release python version, so download that directly
For people downloading this repo,
Download ffglitch from ffglitch.org or the direct links are given below:
For windows:
https://ffglitch.org/pub/bin/win64/ffglitch-0.9.3-win64.7z
For linux:
https://ffglitch.org/pub/bin/linux64/ffglitch-0.9.3-linux64.7z
For mac:
https://ffglitch.org/pub/bin/mac64/ffglitch-0.9.3-mac64.7z
After downloading, extract the files and move the ffedit and ffgac file in this ffglitch folder of datamosher pro.

View File

@@ -1,75 +0,0 @@
// dd_ring_buffer.js
// works kinda like an audio delay
// stacks the previous n frames into a buffer
// global variable holding forward motion vectors from previous frames
var prev_fwd_mvs = [ ];
// change these values to use a smaller or greater number of frames to
// perform the average of motion vectors
// try making the delay long enough to overlap an edit in the content ...
var delay = 10;
// divisor controls "feedback" ... or "feedforward" which ever is a better description ...
var feedback = 0.5; // a number between 0.000001 and .... yeah - controls how much of the buffered mv gets into the next pass
var divisor = 1.0/feedback;
function glitch_frame(frame)
{
// bail out if we have no motion vectors
let mvs = frame["mv"];
if ( !mvs )
return;
// bail out if we have no forward motion vectors
let fwd_mvs = mvs["forward"];
if ( !fwd_mvs )
return;
// update variable holding forward motion vectors from previous
// frames. note that we perform a deep copy of the clean motion
// vector values before modifying them.
let json_str = JSON.stringify(fwd_mvs);
let deep_copy = JSON.parse(json_str);
// push to the end of array
prev_fwd_mvs.push(deep_copy);
// drop values from earliest frames to always keep the same tail
// length
if ( prev_fwd_mvs.length > delay )
prev_fwd_mvs = prev_fwd_mvs.slice(1);
// bail out if we still don't have enough frames
if ( prev_fwd_mvs.length != delay )
return;
// replace all motion vectors of current frame with an average
// of the motion vectors from the previous 10 frames
for ( let i = 0; i < fwd_mvs.length; i++ )
{
// loop through all rows
let row = fwd_mvs[i];
let delay_row = prev_fwd_mvs[0][i];
let insert_row = prev_fwd_mvs[delay-1][i];
for ( let j = 0; j < row.length; j++ )
{
// loop through all macroblocks
let mv = row[j];
let dmv = delay_row[j];
let imv = insert_row[j];
// THIS IS WHERE THE MAGIC HAPPENS
// temp copy of the incoming vectors
let x = mv[0];
let y = mv[1];
// pull their replacements out of the buffer
mv[0] = dmv[0];
mv[1] = dmv[1];
// feedback the 'old' with the 'new' for next time
imv[0] = (dmv[0] / divisor) + x;
imv[1] = (dmv[1] / divisor) + y;
// rinse and repeat
}
}
}

View File

@@ -1,71 +0,0 @@
// dd_delay.js
// works kinda like an audio delay
// stacks the previous n frames into a buffer
// global variable holding forward motion vectors from previous frames
var prev_fwd_mvs = [ ];
// change these values to use a smaller or greater number of frames to
// perform the average of motion vectors
// try making the delay long enough to overlap an edit in the content ...
var delay = 20;
function glitch_frame(frame)
{
// bail out if we have no motion vectors
let mvs = frame["mv"];
if ( !mvs )
return;
// bail out if we have no forward motion vectors
let fwd_mvs = mvs["forward"];
if ( !fwd_mvs )
return;
// update variable holding forward motion vectors from previous
// frames. note that we perform a deep copy of the clean motion
// vector values before modifying them.
let json_str = JSON.stringify(fwd_mvs);
let deep_copy = JSON.parse(json_str);
// push to the end of array
prev_fwd_mvs.push(deep_copy);
// drop values from earliest frames to always keep the same tail
// length
if ( prev_fwd_mvs.length > delay )
prev_fwd_mvs = prev_fwd_mvs.slice(1);
// bail out if we still don't have enough frames
if ( prev_fwd_mvs.length != delay )
return;
// replace all motion vectors of current frame with an average
// of the motion vectors from the previous 10 frames
for ( let i = 0; i < fwd_mvs.length; i++ )
{
// loop through all rows
let row = fwd_mvs[i];
let delay_row = prev_fwd_mvs[0][i];
let insert_row = prev_fwd_mvs[delay-1][i];
for ( let j = 0; j < row.length; j++ )
{
// loop through all macroblocks
let mv = row[j];
let dmv = delay_row[j];
let imv = insert_row[j];
// THIS IS WHERE THE MAGIC HAPPENS
// temp copy of the incoming vectors
let x = mv[0];
let y = mv[1];
// pull their replacements out of the buffer
mv[0] = dmv[0];
mv[1] = dmv[1];
// feedback the 'old' with the 'new' for next time
imv[0] = x;
imv[1] = y;
// rinse and repeat
}
}
}

View File

@@ -1,58 +0,0 @@
// dd_RandomDamage(invertRandomN).js
// invert x and y component of mv for random number of frames if threshold met for frame
let threshold = 95;
var TRIGGERED = 0;
var nFrames = 10;
var frameCount = 0;
var MAGNITUDE = 20;
function glitch_frame(frame)
{
var do_or_not = Math.random() * 100;
if(do_or_not > threshold){
if(TRIGGERED > 0){
}else{
TRIGGERED = 1;
frameCount = 0;
nFrames = Math.random() * MAGNITUDE;
}
}
// only do the glitch if our random number crosses the threshold
if(TRIGGERED > 0 & frameCount <= nFrames){
frameCount++;
// bail out if we have no motion vectors
let mvs = frame["mv"];
if ( !mvs )
return;
// bail out if we have no forward motion vectors
let fwd_mvs = mvs["forward"];
if ( !fwd_mvs )
return;
var M_H = fwd_mvs.length/2;
// clear horizontal element of all motion vectors
for ( let i = 0; i < fwd_mvs.length; i++ )
{
// loop through all rows
let row = fwd_mvs[i];
var M_W = row.length/2;
for ( let j = 0; j < row.length; j++ )
{
// loop through all macroblocks
let mv = row[j];
// THIS IS WHERE THE MAGIC HAPPENS
// STOP XY
mv[0] = 0 - mv[0];
mv[1] = 0 - mv[1];
}
}
}else{
TRIGGERED = 0;
}
}

View File

@@ -1,52 +0,0 @@
// dd_mirror_X.js
// clean buffer :
var buffer = [ ];
var ZOOM = -20;
function glitch_frame(frame)
{
// bail out if we have no motion vectors
let mvs = frame["mv"];
if ( !mvs )
return;
// bail out if we have no forward motion vectors
let fwd_mvs = mvs["forward"];
if ( !fwd_mvs )
return;
// note that we perform a deep copy of the clean motion
// vector values before modifying them.
let json_str = JSON.stringify(fwd_mvs);
let deep_copy = JSON.parse(json_str);
// stick em in the buffer
buffer = deep_copy;
var M_H = fwd_mvs.length/2;
// VERTICALLY
for ( let i = 0; i < fwd_mvs.length; i++ )
{
// loop through all rows
let row = fwd_mvs[i];
var row2 = buffer[i];
//var row2 = fwd_mvs[(fwd_mvs.length-1)-i];
var M_W = row.length/2;
// HORIZONTALLY
for ( let j = 0; j < row.length; j++ )
{
// loop through all macroblocks
let mv = row[j];
var mv2 = row2[(row.length - 1) - j];
// THIS IS WHERE THE MAGIC HAPPENS
//if(i>M_W){
mv[0] = 0-mv2[0];
mv[1] = mv2[1];
//}
}
}
}

View File

@@ -1,67 +0,0 @@
// dd_MultiplySlowest_50.js
// Multiply slowest moving mv's
var LARGEST = 0;
var SOME_PERCENTAGE = 0.5;
var MULTIPLE = 10;
// global variable holding forward motion vectors from previous frames
var prev_fwd_mvs = [ ];
// change this value to use a smaller or greater number of frmes to average
var tail_length = 20;
function glitch_frame(frame)
{
LARGEST = 0;
// bail out if we have no motion vectors
let mvs = frame["mv"];
if ( !mvs )
return;
// bail out if we have no forward motion vectors
let fwd_mvs = mvs["forward"];
if ( !fwd_mvs )
return;
// 1st loop - find the fastest mv
// this ends-up in LARGEST as the square of the hypotenuse (mv[0]*mv[0]) + (mv[1]*mv[1])
let W = fwd_mvs.length;
for ( let i = 0; i < fwd_mvs.length; i++ )
{
let row = fwd_mvs[i];
// rows
let H = row.length;
for ( let j = 0; j < row.length; j++ )
{
// loop through all macroblocks
let mv = row[j];
// THIS IS WHERE THE MEASUREMENT HAPPENS
var this_mv = (mv[0] * mv[0])+(mv[1] * mv[1]);
if ( this_mv > LARGEST){
LARGEST = this_mv;
}
}
}
// then find those mv's which are bigger than SOME_PERCENTAGE of LARGEST
// and then replace them with the average mv from the last n frames
for ( let i = 0; i < fwd_mvs.length; i++ )
{
let row = fwd_mvs[i];
// rows
let H = row.length;
for ( let j = 0; j < row.length; j++ )
{
// loop through all macroblocks
let mv = row[j];
// THIS IS WHERE THE MAGIC HAPPENS
var this_mv = (mv[0] * mv[0])+(mv[1] * mv[1]);
if (this_mv < (LARGEST * SOME_PERCENTAGE)){
mv[0] = mv[0] * MULTIPLE;
mv[1] = mv[1] * MULTIPLE;
}
}
}
}

View File

@@ -1,37 +0,0 @@
// dd_sheer.js
var ZOOM = -20;
function glitch_frame(frame)
{
// bail out if we have no motion vectors
let mvs = frame["mv"];
if ( !mvs )
return;
// bail out if we have no forward motion vectors
let fwd_mvs = mvs["forward"];
if ( !fwd_mvs )
return;
var M_H = fwd_mvs.length/2;
// clear horizontal element of all motion vectors
for ( let i = 0; i < fwd_mvs.length; i++ )
{
// loop through all rows
let row = fwd_mvs[i];
var M_W = row.length/2;
for ( let j = 0; j < row.length; j++ )
{
// loop through all macroblocks
let mv = row[j];
// THIS IS WHERE THE MAGIC HAPPENS
//if(i>M_W){
mv[0] = mv[0] + ((i - M_W) / 100)*ZOOM;
mv[1] = mv[1] + ((j - M_H) / 100)*ZOOM;
//}
}
}
}

View File

@@ -1,74 +0,0 @@
// dd_RandomDamage(antiGrav).js
// anitgravityify if threshold met for frame
let threshold = 98;
// global variable holding forward motion vectors from previous frames
var old_mvs = [ ];
// a variable for gravity
var rt = 0;
var gravity = 0
var orig_gravity = 5;
var TRIGGERED = 0;
var frameCount = 10;
var count = 0;
function glitch_frame(frame)
{
var do_or_not = Math.random() * 100;
// only do the glitch if our random number crosses the threshold
if(do_or_not > threshold | TRIGGERED == 1){
if(TRIGGERED == 0){
gravity = orig_gravity;
TRIGGERED = 1;
rt = 0;
}
// bail out if we have no motion vectors
let mvs = frame["mv"];
if ( !mvs )
return;
// bail out if we have no forward motion vectors
let fwd_mvs = mvs["forward"];
if ( !fwd_mvs )
return;
// buffer first set of vectors. . .
if(rt == 0){
let json_str = JSON.stringify(fwd_mvs);
let deep_copy = JSON.parse(json_str);
// push to the end of array
old_mvs[0] = (deep_copy);
rt = 1;
}
// clear horizontal element of all motion vectors
for ( let i = 0; i < fwd_mvs.length; i++ )
{
// loop through all rows
let row = fwd_mvs[i];
let old_row = old_mvs[0][i];
for ( let j = 0; j < row.length; j++ )
{
// loop through all macroblocks
let mv = row[j];
let omv = old_row[j];
// THIS IS WHERE THE MAGIC HAPPENS
mv[0] = mv[0];
//if(mv[1] < 0){
var nmv = mv[1];
mv[1] = omv[1];
omv[1] = nmv + omv[1] + gravity;
//gravity++;
//}else{
// mv[1] = mv[1];
//}
}
}
count++;
if(count >= frameCount){
TRIGGERED = 0;
count = 0;
}
}
}

View File

@@ -1,38 +0,0 @@
// dd_zero.js
// only fuck things up if mv > movement_threshold
var movement_threshold = 3;
function glitch_frame(frame)
{
// bail out if we have no motion vectors
let mvs = frame["mv"];
if ( !mvs )
return;
// bail out if we have no forward motion vectors
let fwd_mvs = mvs["forward"];
if ( !fwd_mvs )
return;
// columns
let W = fwd_mvs.length;
for ( let i = 0; i < fwd_mvs.length; i++ )
{
let row = fwd_mvs[i];
// rows
let H = row.length;
for ( let j = 0; j < row.length; j++ )
{
// loop through all macroblocks
let mv = row[j];
// THIS IS WHERE THE MAGIC HAPPENS
if ( (mv[0] * mv[0])+(mv[1] * mv[1]) > movement_threshold*movement_threshold){
//mv[0] = Math.sin(i/W*Math.PI*2)*mv[0];
//mv[1] = Math.cos(j/H*Math.PI*2)*mv[1];
mv[0] = 0;//mv[0] * 10;
mv[1] = 0;//mv[1] * 10;
}
}
}
}

View File

@@ -1,37 +0,0 @@
// dd_slam_zoom_in.js
var ZOOM = 20;
function glitch_frame(frame)
{
// bail out if we have no motion vectors
let mvs = frame["mv"];
if ( !mvs )
return;
// bail out if we have no forward motion vectors
let fwd_mvs = mvs["forward"];
if ( !fwd_mvs )
return;
var M_H = fwd_mvs.length/2;
// clear horizontal element of all motion vectors
for ( let i = 0; i < fwd_mvs.length; i++ )
{
// loop through all rows
let row = fwd_mvs[i];
var M_W = row.length/2;
for ( let j = 0; j < row.length; j++ )
{
// loop through all macroblocks
let mv = row[j];
// THIS IS WHERE THE MAGIC HAPPENS
//if(i>M_W){
mv[0] = ((M_W - j) / 100)*ZOOM;
mv[1] = ((M_H - i) / 100)*ZOOM;
//}
}
}
}

View File

@@ -1,68 +0,0 @@
// dd_RandomDamage(progZoom).js
// progressive Zoom x and y components of mv if threshold met for frame
let threshold = 95;
var ZOOM = 0;
var doZOOM = 0;
var TRIGGERED = 0;
var nFrames = 5;
var frameCount = 0;
function glitch_frame(frame)
{
var do_or_not = Math.random() * 100;
if(do_or_not > threshold){
if(TRIGGERED > 0){
}else{
TRIGGERED = 1;
frameCount = 0;
ZOOM = 0;
}
}
// only do the glitch if our random number crosses the threshold
if(TRIGGERED > 0 & frameCount <= nFrames){
frameCount++;
ZOOM+= 10
var do_dir = Math.random() * 100;
if(do_dir > 50){
doZOOM = 0 - ZOOM;
}else{
doZOOM = ZOOM
}
// bail out if we have no motion vectors
let mvs = frame["mv"];
if ( !mvs )
return;
// bail out if we have no forward motion vectors
let fwd_mvs = mvs["forward"];
if ( !fwd_mvs )
return;
var M_H = fwd_mvs.length/2;
// clear horizontal element of all motion vectors
for ( let i = 0; i < fwd_mvs.length; i++ )
{
// loop through all rows
let row = fwd_mvs[i];
var M_W = row.length/2;
for ( let j = 0; j < row.length; j++ )
{
// loop through all macroblocks
let mv = row[j];
// THIS IS WHERE THE MAGIC HAPPENS
// ZOOM X & Y VECTORS
mv[0] = mv[0] + ((M_W - j) / 10)*doZOOM;
mv[1] = mv[1] + ((M_H - i) / 10)*doZOOM;
}
}
}else{
TRIGGERED = 0;
}
}

View File

@@ -1,56 +0,0 @@
// dd_RandomDamage(stopXY).js
// stop x and y component of mv for n framesif threshold met for frame
let threshold = 95;
var TRIGGERED = 0;
var nFrames = 10;
var frameCount = 0;
function glitch_frame(frame)
{
var do_or_not = Math.random() * 100;
if(do_or_not > threshold){
if(TRIGGERED > 0){
}else{
TRIGGERED = 1;
frameCount = 0;
}
}
// only do the glitch if our random number crosses the threshold
if(TRIGGERED > 0 & frameCount <= nFrames){
frameCount++;
// bail out if we have no motion vectors
let mvs = frame["mv"];
if ( !mvs )
return;
// bail out if we have no forward motion vectors
let fwd_mvs = mvs["forward"];
if ( !fwd_mvs )
return;
var M_H = fwd_mvs.length/2;
// clear horizontal element of all motion vectors
for ( let i = 0; i < fwd_mvs.length; i++ )
{
// loop through all rows
let row = fwd_mvs[i];
var M_W = row.length/2;
for ( let j = 0; j < row.length; j++ )
{
// loop through all macroblocks
let mv = row[j];
// THIS IS WHERE THE MAGIC HAPPENS
// STOP XY
mv[0] = 0;
mv[1] = 0;
}
}
}else{
TRIGGERED = 0;
}
}

View File

@@ -1,29 +0,0 @@
var randomness = 10;
var bias = (randomness/2);
function glitch_frame(frame)
{
// bail out if we have no motion vectors
let mvs = frame["mv"];
if ( !mvs )
return;
// bail out if we have no forward motion vectors
let fwd_mvs = mvs["forward"];
if ( !fwd_mvs )
return;
// clear horizontal element of all motion vectors
for ( let i = 0; i < fwd_mvs.length; i++ )
{
// loop through all rows
let row = fwd_mvs[i];
for ( let j = 0; j < row.length; j++ )
{
// loop through all macroblocks
let mv = row[j];
// THIS IS WHERE THE MAGIC HAPPENS
mv[0] = mv[0] + (Math.floor((Math.random() * randomness) -bias));
mv[1] = mv[1] + (Math.floor((Math.random() * randomness) -bias));
}
}
}

View File

@@ -1,38 +0,0 @@
// dd_zoom_in.js
var ZOOM = 20;
function glitch_frame(frame)
{
// bail out if we have no motion vectors
let mvs = frame["mv"];
if ( !mvs )
return;
// bail out if we have no forward motion vectors
let fwd_mvs = mvs["forward"];
if ( !fwd_mvs )
return;
var M_H = fwd_mvs.length/2;
// clear horizontal element of all motion vectors
for ( let i = 0; i < fwd_mvs.length; i++ )
{
// loop through all rows
let row = fwd_mvs[i];
var M_W = row.length/2;
for ( let j = 0; j < row.length; j++ )
{
// loop through all macroblocks
let mv = row[j];
// THIS IS WHERE THE MAGIC HAPPENS
//if(i>M_W){
mv[0] = mv[0] + ((M_W - j) / 100)*ZOOM;
mv[1] = mv[1] + ((M_H - i) / 100)*ZOOM;
//}
}
}
}

View File

@@ -1,27 +0,0 @@
from .container import avi
__all__ = ['Index']
class Index(object):
"""Index is an index of video frame data."""
def __init__(self):
pass
@staticmethod
def from_file(filename: str):
instance = Index()
instance.filename = filename
instance.index = None
# Assume AVI for now
instance.index = avi.AVIFile.from_file(filename)
return instance
def __getattr__(self, index):
return getattr(self.index, index)
def __iter__(self):
return iter(self.index)

View File

@@ -1 +0,0 @@

View File

@@ -1,6 +0,0 @@
IFRAME_HEADER = b'\x00\x00\x01\xb0'
def is_iframe(frame):
"""Determine whether frame is an I frame."""
return frame[:4] == IFRAME_HEADER

View File

@@ -1 +0,0 @@

View File

@@ -1,124 +0,0 @@
import struct
from pymosh.codec.mpeg4 import is_iframe
from . import riff
class AVIFile(object):
"""A wrapper for AVI files."""
def __init__(self):
self.riff = riff.RiffIndex()
self.streams = []
self.frame_order = []
@staticmethod
def from_file(filename: str):
instance = AVIFile()
instance.riff = riff.RiffIndex.from_file(filename=filename)
header = instance.riff.find(b'LIST', b'hdrl')
# Get stream info
stream_lists = header.find_all(b'LIST', b'strl')
for l in stream_lists:
strh = l.find(b'strh')
data = strh.data
fccType, = struct.unpack(b'4s', data[:4])
stream = Stream(len(instance.streams), fccType)
instance.streams.append(stream)
instance.split_streams()
return instance
def __iter__(self):
return iter(self.streams)
def add_frame(self, chunk):
stream_num = int(chunk.header[:2])
if stream_num < len(self.streams):
self.frame_order.append(
(stream_num, len(self.streams[stream_num])))
self.streams[stream_num].add_frame(chunk)
def split_streams(self):
movi = self.riff.find(b'LIST', b'movi')
for chunk in movi:
self.add_frame(chunk)
def combine_streams(self):
chunks = []
for frame_record in self.frame_order:
stream_num, frame_num = frame_record
stream = self.streams[stream_num]
frame = stream[frame_num]
chunks.append(frame)
return chunks
def _video(self):
return filter(lambda stream: stream.type == b'vids', self.streams)
video = property(_video)
def _audio(self):
return filter(lambda stream: stream.type == b'auds', self.streams)
audio = property(_audio)
def rebuild(self):
"""Rebuild RIFF tree and index from streams."""
movi = self.riff.find(b'LIST', b'movi')
movi.chunks = self.combine_streams()
self.rebuild_index()
def rebuild_index(self):
old_index = self.riff.find(b'idx1')
movi = self.riff.find(b'LIST', b'movi')
data = b''
offset = 0
flags = {
'base': 0x00000000,
'keyframe': 0x00000010,
}
for chunk in movi:
length = len(chunk)
frame_flags = flags['base']
# If it's a video keyframe or audio frame, use keyframe flag
if (chunk.header[2] == b'd' and is_iframe(chunk)) or (chunk.header[2] == b'w'):
frame_flags |= flags['keyframe']
data += struct.pack(b'<4sIII', chunk.header, frame_flags, offset,
length+8)
offset += length + 8 + (length % 2)
new_index = riff.RiffDataChunk(b'idx1', data)
self.riff.find(b'RIFF').replace(old_index, new_index)
def write(self, fh):
self.riff.write(fh)
class Stream(object):
def __init__(self, num, stream_type):
self.num = int(num)
self.type = stream_type
self.chunks = []
def add_frame(self, chunk):
self.chunks.append(chunk)
def __getitem__(self, index):
return self.chunks.__getitem__(index)
def __iter__(self):
return self.chunks.__iter__()
def __len__(self):
return len(self.chunks)
def append(self, *args):
return self.chunks.append(*args)
def extend(self, *args):
return self.chunks.extend(*args)
def replace(self, chunks):
self.chunks = chunks

View File

@@ -1,263 +0,0 @@
import os
import struct
from io import IOBase
list_headers = (b'RIFF', b'LIST')
class UnexpectedEOF(Exception):
pass
class RiffIndexChunk(object):
def __init__(self, fh, header, length, position):
self.file = fh
self.header = header
self.length = int(length)
self.position = position
def __str__(self):
return str(self.bytes())
def bytes(self) -> bytes:
return self.header + struct.pack('<I', self.length) + self.data
def __len__(self):
return self.length
def __getslice__(self, start, end):
if start is None:
start = 0
if end is None:
end = self.length
current = self.file.tell()
self.file.seek(self.position+start)
if start < end and start <= self.length:
if end > self.length:
end = self.length
data = self.file.read(end-start)
self.file.seek(current)
return data
else:
return ''
def __getitem__(self, index):
if isinstance(index, slice):
return self.__getslice__(index.start, index.stop)
return self[index:index+1]
def _data(self):
"""Read data from the file."""
current_position = self.file.tell()
self.file.seek(self.position)
data = self.file.read(self.length)
self.file.seek(current_position)
if self.length % 2:
data += b'\x00' # Padding byte
return data
data = property(_data)
def as_data(self):
"""Return a RiffDataChunk read from the file."""
raise NotImplementedError()
class RiffIndexList(RiffIndexChunk):
def __init__(self, header, list_type, *args, **kwargs):
self.header = header
self.type = list_type
self.file = kwargs.get('file', None)
self.position = kwargs.get('position', 0)
self.chunks = kwargs.get('chunks', [])
def __getitem__(self, index):
return self.chunks[index]
def __setitem__(self, index, value):
return self.chunks.__setitem__(index, value)
def __delitem__(self, index):
return self.chunks.__delitem__(index)
def __iter__(self):
return iter(self.chunks)
def __len__(self):
"""Return total data length of the list and its headers."""
return self.chunk_length() + len(self.type) + len(self.header) + 4
def chunk_length(self):
length = 0
for chunk in self.chunks:
chunk_len = len(chunk)
length += chunk_len + 8 # Header and length bytes
length += chunk_len % 2 # Pad byte
return length
def __str__(self):
return str(self.bytes())
def bytes(self) -> bytes:
"""Returns a byte representation of the chunk."""
length_bytes = struct.pack('<I', self.chunk_length() + len(self.type))
return self.header + length_bytes + self.type
class NotFound(Exception):
"""Indicates a chunk or list was not found by the find method."""
pass
def find(self, header, list_type=None):
"""Find the first chunk with specified header and optional list type."""
for chunk in self:
if chunk.header == header and (list_type is None or (header in
list_headers and chunk.type == list_type)):
return chunk
elif chunk.header in list_headers:
try:
result = chunk.find(header, list_type)
return result
except chunk.NotFound:
pass
if list_type is None:
raise self.NotFound('Chunk \'{0}\' not found.'.format(header))
else:
raise self.NotFound('List \'{0} {1}\' not found.'.format(header,
list_type))
def find_all(self, header, list_type=None):
"""Find all direct children with header and optional list type."""
found = []
for chunk in self:
if chunk.header == header and (not list_type or (header in
list_headers and chunk.type == list_type)):
found.append(chunk)
return found
def replace(self, child, replacement):
"""Replace a child chunk with something else."""
for i in range(len(self.chunks)):
if self.chunks[i] == child:
self.chunks[i] = replacement
def remove(self, child):
"""Remove a child element."""
for i in range(len(self)):
if self[i] == child:
del self[i]
class RiffDataChunk(object):
"""A RIFF chunk with data in memory instead of a file."""
def __init__(self, header, data):
self.header = header
self.length = len(data)
self.data = data
@staticmethod
def from_data(data):
"""Create a chunk from data including header and length bytes."""
header, _ = struct.unpack('4s<I', data[:8])
data = data[8:]
return RiffDataChunk(header, data)
def bytes(self) -> bytes:
"""Returns a byte array representation of the chunk."""
return self.header + struct.pack('<I', self.length) + self.data
def __str__(self):
return str(self.bytes())
def __len__(self):
return self.length
def __getslice__(self, start, end):
return self.data[start:end]
def __getitem__(self, index):
return self.data[index]
class RiffIndex(RiffIndexList):
def __init__(self):
self.file = None
self.chunks = []
@staticmethod
def from_file(filename: str):
instance = RiffIndex()
instance.file = open(filename, 'rb')
instance.size = instance.get_size()
instance.scan_file()
return instance
def write(self, fh: IOBase) -> None:
def print_chunks(chunks):
for chunk in chunks:
fh.write(chunk.bytes())
if chunk.header in (b'RIFF', b'LIST'):
print_chunks(chunk.chunks)
print_chunks(self.chunks)
def get_size(self):
current = self.file.tell()
self.file.seek(0, 2)
size = self.file.tell()
self.file.seek(current)
return size
def readlen(self, length):
buf = self.file.read(length)
if len(buf) == length:
return buf
else:
raise UnexpectedEOF(
'End of file reached after {0} bytes.'.format(len(buf)))
def scan_file(self):
header = self.readlen(4)
if header == b'RIFF':
length, list_type = struct.unpack('<I4s', self.readlen(8))
chunks = self.scan_chunks(length-4)
self.chunks.append(RiffIndexList(header, list_type, file=self.file,
position=0, chunks=chunks))
else:
raise Exception('Not a RIFF file!')
def scan_chunks(self, data_length):
chunks = []
total_length = 0
while total_length < data_length:
header = self.readlen(4)
total_length += 4
length, = struct.unpack('<I', self.file.read(4))
total_length += length + 4 # add 4 for itself
position = self.file.tell()
if header in list_headers:
list_type = self.readlen(4)
data = self.scan_chunks(length-4)
if length % 2:
# Padding byte
self.file.seek(1, os.SEEK_CUR)
total_length += 1
chunks.append(RiffIndexList(header, list_type, file=self.file,
position=position, chunks=data))
else:
self.file.seek(length, os.SEEK_CUR)
if length % 2:
# Padding byte
self.file.seek(1, os.SEEK_CUR)
total_length += 1
chunks.append(RiffIndexChunk(
self.file, header, length, position))
return chunks
def close(self):
self.file.close()