Quick Reference

Installation

using Pkg
Pkg.add("Peddy")

Or for development:

julia +1.11 --project=.
julia> using Pkg; Pkg.instantiate()

Minimal Example

using Peddy
using DimensionalData
using Dates

# Create data
times = DateTime(2024, 1, 1):Millisecond(50):DateTime(2024, 1, 1, 0, 1, 0)
vars = [:Ux, :Uy, :Uz, :Ts, :diag_sonic]
data = hcat(
    sin.(range(0, 1, length=length(times))),
    cos.(range(0, 1, length=length(times))),
    0.1 .* randn(length(times)),
    20 .+ 0.01 .* randn(length(times)),
    zeros(length(times))
)
hf = DimArray(data, (Var(vars), Ti(times)))

# Configure pipeline
sensor = CSAT3()
qc = PhysicsBoundsCheck()
desp = SimpleSigmundDespiking()
gap = GeneralInterpolation()
out = MemoryOutput()

pipeline = EddyPipeline(
    sensor=sensor,
    quality_control=qc,
    despiking=desp,
    gap_filling=gap,
    output=out
)

# Run
process!(pipeline, hf, nothing)

# Get results
hf_res, lf_res = Peddy.get_results(out)

Common Configurations

Minimal Processing (QC only)

pipeline = EddyPipeline(
    sensor=CSAT3(),
    quality_control=PhysicsBoundsCheck(),
    output=MemoryOutput()
)

Standard Processing

pipeline = EddyPipeline(
    sensor=CSAT3(),
    quality_control=PhysicsBoundsCheck(),
    despiking=SimpleSigmundDespiking(),
    gap_filling=GeneralInterpolation(),
    output=ICSVOutput("/path/to/output")
)

Full Processing with MRD

pipeline = EddyPipeline(
    sensor=IRGASON(),
    quality_control=PhysicsBoundsCheck(),
    gas_analyzer=H2OCalibration(),
    despiking=SimpleSigmundDespiking(),
    gap_filling=GeneralInterpolation(),
    double_rotation=WindDoubleRotation(),
    mrd=OrthogonalMRD(),
    output=NetCDFOutput("/path/to/output")
)

With Logging

logger = ProcessingLogger()

pipeline = EddyPipeline(
    sensor=CSAT3(),
    quality_control=PhysicsBoundsCheck(),
    output=MemoryOutput(),
    logger=logger
)

process!(pipeline, hf, lf)
write_processing_log(logger, "/path/to/log.csv")

Data Access Patterns

Get Variable

ux = hf[Var=At(:Ux)]

Get Time Slice

t0 = DateTime(2024, 1, 1, 12, 0, 0)
slice = hf[Ti=At(t0)]

Get Time Range

t_start = DateTime(2024, 1, 1, 0, 0, 0)
t_end = DateTime(2024, 1, 1, 1, 0, 0)
subset = hf[Ti=Between(t_start, t_end)]

Modify In-Place

ux = @view hf[Var=At(:Ux)]
ux[ux .> 100] .= NaN

Get Statistics

ux = hf[Var=At(:Ux)]
mean_ux = Peddy.mean_skipnan(ux)
std_ux = std(skipmissing(ux))

Pipeline Steps Cheat Sheet

StepTypePurposeExample
Quality ControlAbstractQCRemove physically impossible valuesPhysicsBoundsCheck()
Gas AnalyzerAbstractGasAnalyzerCorrect H₂O measurementsH2OCalibration()
DespikingAbstractDespikingRemove spikesSimpleSigmundDespiking()
Make ContinuousAbstractMakeContinuousInsert missing timestampsMakeContinuous()
Gap FillingAbstractGapFillingInterpolate small gapsGeneralInterpolation()
Double RotationAbstractDoubleRotationAlign with mean windWindDoubleRotation()
MRDAbstractMRDMultiresolution decompositionOrthogonalMRD()
OutputAbstractOutputWrite resultsICSVOutput(), NetCDFOutput()

Quality Control

Default Bounds

qc = PhysicsBoundsCheck()
# Ux, Uy: [-100, 100] m/s
# Uz: [-50, 50] m/s
# Ts: [-50, 50] °C
# CO2: [0, ∞] ppm
# H2O: [0, ∞] mmol/mol
# T: [-50, 50] °C
# P: [0, ∞] Pa

Custom Bounds

qc = PhysicsBoundsCheck(
    Ux=Limit(-50, 50),
    Uy=Limit(-50, 50),
    Uz=Limit(-30, 30),
    Ts=Limit(-40, 50)
)

Despiking

Default Configuration

desp = SimpleSigmundDespiking()
# window_minutes=5.0
# spike_threshold=6.0 for all variables

Custom Groups

wind = VariableGroup("Wind", [:Ux, :Uy, :Uz], spike_threshold=6.0)
temp = VariableGroup("Sonic T", [:Ts], spike_threshold=6.0)
gas = VariableGroup("Gas", [:H2O], spike_threshold=5.0)

desp = SimpleSigmundDespiking(
    window_minutes=5.0,
    variable_groups=[wind, temp, gas]
)

Gap Filling

Linear Interpolation (default)

gap = GeneralInterpolation()
# max_gap_size=10
# method=Linear()

Cubic Spline

gap = GeneralInterpolation(
    max_gap_size=10,
    method=Cubic()
)

Custom Variables

gap = GeneralInterpolation(
    variables=[:Ux, :Uy, :Uz, :Ts],
    max_gap_size=20
)

Double Rotation

Default (30-minute blocks)

rot = WindDoubleRotation()

Custom Block Size

rot = WindDoubleRotation(block_duration_minutes=15.0)

MRD

Default Configuration

mrd = OrthogonalMRD()
# M=11 (2^11 = 2048 samples per block)
# shift=256 (samples between blocks)
# a=:Uz, b=:Ts

Custom Configuration

mrd = OrthogonalMRD(
    M=10,
    shift=128,
    a=:Uz,
    b=:Ts,
    gap_threshold_seconds=10.0,
    normalize=false,
    regular_grid=false
)

decompose!(mrd, hf, lf)
results = get_mrd_results(mrd)

if results !== nothing
    @show results.scales
    @show size(results.mrd)
    plot(results)
end

Input/Output

Read from .dat Files

input = DotDatDirectory(
    directory="/path/to/data",
    high_frequency_file_glob="*fast*",
    high_frequency_file_options=FileOptions(
        timestamp_column=:TIMESTAMP,
        time_format=dateformat"yyyy-mm-dd HH:MM:SS.s"
    )
)

hf, lf = read_data(input, CSAT3())

Write to Memory

out = MemoryOutput()
process!(pipeline, hf, lf)
hf_res, lf_res = Peddy.get_results(out)

Write to CSV

out = ICSVOutput("/path/to/output")

Write to NetCDF

out = NetCDFOutput("/path/to/output")

Write to Multiple Formats

out = OutputSplitter(
    ICSVOutput("/path/csv"),
    NetCDFOutput("/path/nc")
)

Sensors

CSAT3

sensor = CSAT3()
# Requires: Ux, Uy, Uz, Ts, diag_sonic

CSAT3B

sensor = CSAT3B()
# Requires: Ux, Uy, Uz, Ts, diag_sonic

IRGASON

sensor = IRGASON()
# Requires: Ux, Uy, Uz, Ts, CO2, H2O, diag_sonic, diag_irga

LICOR with Calibration

sensor = LICOR(
    calibration_coefficients=H2OCalibrationCoefficients(
        A=4.82004e3,
        B=3.79290e6,
        C=-1.15477e8,
        H2O_Zero=0.7087,
        H20_Span=0.9885
    )
)

Logging

Enable Logging

logger = ProcessingLogger()

pipeline = EddyPipeline(
    sensor=sensor,
    output=output,
    logger=logger
)

process!(pipeline, hf, lf)
write_processing_log(logger, "/path/to/log.csv")

Disable Logging (zero overhead)

logger = NoOpLogger()

pipeline = EddyPipeline(
    sensor=sensor,
    output=output,
    logger=logger
)

Debugging

Check Data Validity

check_data(hf, lf, sensor)

Run Steps Manually

quality_control!(qc, hf, lf, sensor)
despike!(desp, hf, lf)
fill_gaps!(gap, hf, lf)
rotate!(rot, hf, lf)
decompose!(mrd, hf, lf)
write_data(out, hf, lf)

Enable Debug Logging

using Logging

logger = ConsoleLogger(stderr, Logging.Debug)
with_logger(logger) do
    process!(pipeline, hf, lf)
end

Inspect Intermediate Results

n_nan_before = count(isnan, hf)
quality_control!(qc, hf, lf, sensor)
n_nan_after = count(isnan, hf)
@show n_nan_after - n_nan_before

Common Errors & Solutions

ErrorSolution
"High frequency data must have a Var dimension"Use DimArray with Var and Ti dimensions
"Var dimension must have a Ux variable"Check sensor requirements with needs_data_cols(sensor)
"Variable X not found"Ensure variable exists in data before using it
"Not enough samples"Provide more data or reduce block sizes
"Block size calculation failed"Data too short for requested processing

Performance Tips

  1. Use views for in-place modification: @view hf[Var=At(:Ux)]
  2. Disable expensive steps if not needed: Set to nothing
  3. Use NoOpLogger for production: Zero overhead
  4. Process in chunks for large datasets: Split by time period
  5. Use appropriate interpolation method: Linear() is fastest, Cubic() is most accurate

File Formats

CSV Output Structure

timestamp, Ux, Uy, Uz, Ts, ...
2024-01-01T00:00:00, 1.23, 0.45, -0.12, 20.5, ...
2024-01-01T00:00:00.050, 1.25, 0.43, -0.11, 20.6, ...

NetCDF Output Structure

Dimensions:
  time: 1000000
  variables: 8

Variables:
  time (time): datetime64
  Ux (variables, time): float64
  Uy (variables, time): float64
  ...

Useful Functions

# Mean ignoring NaN
Peddy.mean_skipnan(arr)

# Get results from MemoryOutput
hf_res, lf_res = Peddy.get_results(output)

# Get MRD results
results = get_mrd_results(mrd)

# Check if logging enabled
is_logging_enabled(logger)

# Get variable metadata
meta = metadata_for(:Ux)

Resources