-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathschema_validata.py
More file actions
4223 lines (3586 loc) · 159 KB
/
schema_validata.py
File metadata and controls
4223 lines (3586 loc) · 159 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os # Standard library for interacting with the operating system
import sys
import shutil # Standard library for high-level file operations
import json # Standard library for working with JSON objects
import ast # Standard library for parsing and processing Python abstract syntax trees
import math # Standard library for mathematical functions
import hashlib # Standard library for generating hash values
import chardet # Library for character encoding detection
import re # Standard library for regular expressions
import warnings # Standard library for issuing warning messages
from datetime import datetime # Standard library for working with dates and times
from dateutil import parser as dt_parser # Library for parsing dates from strings
import pandas as pd # Library for data manipulation and analysis
import numpy as np # Library for numerical operations
try:
import pyspark
import pyspark.pandas as ps # Library for data manipulation and analysis with Spark
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, FloatType, StringType, DateType, TimestampType, BooleanType
from pyspark.sql.dataframe import DataFrame as SparkDataFrame # Alias for Spark DataFrame class/type
from pyspark.sql import SparkSession
pyspark_available = True
except ImportError:
print("pyspark.pandas is not available in the session.")
pyspark_available = False
from sqlite3 import connect # Standard library for creating and managing SQLite3 databases
import sqlparse # Library for parsing SQL queries
import sql_metadata # Library for advanced parsing of SQL queries
from sqllineage.runner import LineageRunner # More robust libary for itentifying sql parts
import sqlglot # Most robust library for parsing and analyzing SQL queries
from sqlglot.expressions import Star, Select, Table, With
#----------------------------------------------------------------------------------
# List of warnings to silence
warnings_to_ignore = [
{"category": UserWarning},
{"message": "^Columns.*"},
{"category": FutureWarning}
]
# Suppress the warnings
for warning in warnings_to_ignore:
if "category" in warning:
warnings.filterwarnings("ignore", category=warning["category"])
elif "message" in warning:
warnings.filterwarnings("ignore", message=warning["message"])
try:
# Ignore future warning on silent down casting (code assumes new method)
pd.set_option('future.no_silent_downcasting', True)
except:
pass
#----------------------------------------------------------------------------------
# Config class
class Config:
"""
Configuration class for storing package constants and settings.
This class provides a central location for managing configuration settings
and constants used throughout the schema_validata package.
Attributes:
NA_VALUES (list): List of strings representing values to be treated as NaN.
NA_PATTERNS (list): List of regex patterns for identifying values to be treated as NaN.
DATE_FORMAT (str): Default date format used for parsing dates in the application.
...
Example:
config/DATA_DICT_PRIMARY_KEY = 'Name' # Changing a configuration attribute
print(config.NA_VALUES) # Accessing a configuration attribute
"""
if pyspark_available:
USE_PYSPARK = True
SPARK_SESSION = SparkSession.builder.appName("schema_validata").getOrCreate()
# Data dictionary schema
DATA_DICT_SCHEMA = {
"field_name": "object",
"required": "object",
"data_type": "object",
"allow_null": "object",
"length": "object",
"range_min": "float",
"range_max": "float",
"regex_pattern": "object",
"unique_value": "object",
"allowed_value_list": "object"
}
# Data integrity schema
DATA_INTEGRITY_SCHEMA = {
'Primary Table': "object",
'SQL Error Query': "object",
'Level': "object",
'Message': "object"
}
# If True, only return error records for explicitly referenced columns in data integrity SQL
# The most unique (primary) key is also included for row reference.
DATA_INTRGTY_EXPL_COLS_ONLY = False
# Data dictionary schema primary key field
DATA_DICT_PRIMARY_KEY = "field_name"
# Overview error message string formats
SCHEMA_ERROR_TEMPLATES = {
'required_column' : "Column by name '{col}' is required, but missing in dataset.",
'optional_column' : "Column by name '{col}' is missing in the dataset, but is optional.",
'allow_null' : "Column '{col}' data has {count} null values, null values are not allowed.",
'data_type' : "Column '{col}' data type: {observed} does not match the required data type: {expected} .",
'unique_value' : "Column '{col}' values must be unique. Found {count} duplicate values in dataset column .",
'length' : "Column '{col}' max string of: {observed} exceeds the max allowed of: {expected} .",
'range_min' : "Column '{col}' min value of: {observed} is less than the minimum allowed value of: {expected} .",
'range_max' : "Column '{col}' max value of: {observed} exceeds the maximum allowed value of: {expected} .",
'allowed_value_list' : "Column '{col}' contains values that are not allowed: {err_vals} .",
'regex_pattern' : "Column '{col}' contains values which do not match the allowed format/pattern ."
}
# Overview error message string formats
SCHEMA_REQUIRED_MESSAGE_LEVELS = {
True : "Error",
False : "Informational/Warning",
}
DATE_COL_KEYWORDS = [
'date', 'time', 'datetime', 'timestamp', 'dob', 'dt',
'created', 'modified', 'updated', 'birthday', 'event_time'
]
# Common US & ISO timestamp formats
COMMON_TIMESTAMPS = [
"%H:%M:%S", # 24-Hour Time (most common technical)
"%I:%M:%S %p", # 12-Hour Time to seconds with AM/PM
"%I:%M:%S%p", # 12-Hour Time to seconds with AM/PM (no space)
"%I:%M %p", # 12-Hour Time to mins with AM/PM
"%I:%M%p", # 12-Hour Time to mins with AM/PM (no space)
"%H:%M", # 24-Hour Time, no seconds
"%I %p", # 12-Hour, hour only with AM/PM
"%H", # 24-Hour, hour only
]
# Common US & ISO date/datetime formats (ordered by specificity and likelihood)
COMMON_DATETIMES = [
# ISO & HIGH-PRECISION DATETIMES
"%Y-%m-%dT%H:%M:%S%z", # ISO with Offset: 2026-01-06T15:00:00+0000
"%Y-%m-%dT%H:%M:%SZ", # ISO Zulu: 2026-01-06T15:00:00Z
"%Y-%m-%d %H:%M:%S.%f", # ISO with Microseconds
"%Y-%m-%d %H:%M:%S", # ISO Standard: 2026-01-06 15:00:00
# 4-DIGIT YEAR DATETIMES (US & EU)
"%m/%d/%Y %H:%M:%S", # 01/06/2026 15:00:00
"%-m/%-d/%Y %H:%M:%S", # 1/6/2026 15:00:00
"%d/%m/%Y %H:%M:%S", # 06/01/2026 15:00:00
"%-d/%-m/%Y %H:%M:%S", # 6/1/2026 15:00:00
"%B %d, %Y %H:%M:%S", # January 06, 2026 15:00:00
# 2-DIGIT YEAR DATETIMES (US & EU)
"%m/%d/%y %H:%M:%S", # 01/06/26 15:00:00
"%-m/%-d/%y %H:%M:%S", # 1/6/26 15:00:00
"%d/%m/%y %H:%M:%S", # 06/01/26 15:00:00
"%-d/%-m/%y %H:%M:%S", # 6/1/26 15:00:00
# 4-DIGIT YEAR DATES (No Time)
"%Y-%m-%d", # 2026-01-06 (Standard ISO)
"%m/%d/%Y", # 01/06/2026
"%-m/%-d/%Y", # 1/6/2026
"%-m/%d/%Y", # 1/06/2026
"%m/%-d/%Y", # 01/6/2026
"%d/%m/%Y", # 06/01/2026
"%-d/%-m/%Y", # 6/1/2026
"%-d/%m/%Y", # 6/01/2026
"%d/%-m/%Y", # 06/1/2026
"%b-%d-%Y", # Jan-06-2026
"%B %d, %Y", # January 06, 2026
"%d-%m-%Y", # 06-01-2026
# 2-DIGIT YEAR DATES (No Time)
"%m/%d/%y", # 01/06/26
"%-m/%-d/%y", # 1/6/26
"%-m/%d/%y", # 1/06/26
"%m/%-d/%y", # 01/6/26
"%d/%m/%y", # 06/01/26
"%-d/%-m/%y", # 6/1/26
"%-d/%m/%y", # 6/01/26
"%d/%-m/%y", # 06/1/26
"%y-%m-%d", # 26-01-06
# PARTIAL DATES
"%Y-%m", # 2026-01
"%-m/%Y", # 1/2026
"%B %Y", # January 2026
"%b %Y", # Jan 2026
]
# Common null/missing value representations
COMMON_NA_VALUES = [
'', # Empty string
' ', # Single space
'N/A', # Common missing value
'n/a', # Lowercase n/a
'NA', # Common missing value
'na', # Lowercase na
'NULL', # Uppercase NULL
'Null', # Capitalized Null
'null', # Lowercase null
'None', # String None
None, # Python None
np.nan, # NumPy NaN
'NaN', # Not a Number
'nan', # Lowercase nan
'-NaN', # Negative NaN
'-nan', # Negative NaN (lowercase)
'#N/A', # Excel error
'#NA', # Excel error
'<NA>', # Pandas string for missing value
'#REF!', # Excel error
'#VALUE!', # Excel error
'#DIV/0!', # Excel division by zero error
'missing', # Lowercase missing
'Missing', # Capitalized missing
]
# Additional values unique to pandas >= 1.5
NA_VALUES_v1_5 = [
'#N/A N/A', # Less standard combination
'-1.#IND', # Specific float representation
'-1.#QNAN', # Specific float representation
'1.#IND', # Specific float representation
'1.#QNAN', # Specific float representation
]
if pd.__version__ >= '1.5':
NA_VALUES = COMMON_NA_VALUES + NA_VALUES_v1_5
else:
NA_VALUES = COMMON_NA_VALUES
# Standard pattern reps for nulls, values will be converted to nulls
NA_PATTERNS = [
r'^\s*NOT\s{0,1}(?:\s|_|-|/|\\|/){1}\s{0,1}AVAILABLE\s*$',
r'^\s*N\s{0,1}(?:\s|_|-|/|\\|/){1}\s{0,1}A\s*$',
r'^\s*(?:\s|_|-|/|\\|/){1}\s*$',
r'^\s+$'
]
# List of symbol characters to remove from string values before attempting numeric conversion.
# Includes common currency signs and scaling indicators (percent, per mille).
NUMERIC_SYMBOLS = [
'$', '€', '£', '¥', '₹', '₽', # Currency Signs
'%', '‰', # Scaling Indicators (Percent, Per Mille)
',' # Thousands separator
]
# Data integrity properies
# Determines if all columns (FALSE) are returned from an SQL
# data integrity check when * is used or not (TRUE)
DATA_INTRGTY_EXPL_COLS_ONLY = False
# Dictionry to hold custom variables for dynamic sql queries
SQL_STATEMENT_VARS = {}
class jsonEncoder(json.JSONEncoder):
"""Custom JSON encoder class that handles serialization of NumPy data types
(int64, float64, and arrays) for compatibility with JSON.
This class inherits from `json.JSONEncoder` and overrides the `default` method
to provide custom logic for serializing specific object types.
"""
def default(self, obj):
"""
Overrides the default method of JSONEncoder to handle specific object types.
Parameters
----------
obj:
The object to be serialized.
Returns
-------
A JSON-serializable representation of the object.
"""
if isinstance(obj, np.integer):
"""Handle NumPy integer types (e.g., int64) by converting them to regular Python int."""
return int(obj)
elif isinstance(obj, np.floating):
"""Handle NumPy floating-point types (e.g., float64) by converting them to regular Python float."""
return float(obj)
elif isinstance(obj, np.ndarray):
"""Handle NumPy arrays by converting them to lists for JSON encoding."""
return self.encode(obj.tolist()) # Recursively convert to list
return super().default(obj)
#----------------------------------------------------------------------------------
def db_path_to_local(path):
"""Function returns a local os file path from dbfs file path
Parameters
----------
path : str
DataBricks dbfs file storage path
Returns
----------
file path: str
local os file path
"""
if path.startswith(r'/mnt'):
path = f"{r'/dbfs'}{path}"
return re.sub(r'^(dbfs:)', r'/dbfs', path)
#----------------------------------------------------------------------------------
def to_dbfs_path(path):
"""Function converts a local os file path to a dbfs file path
Parameters
----------
path : str
local os file path
Returns
----------
file path: str
DataBricks dbfs file storage path
"""
if path.startswith(r'/mnt'):
path = f"{r'dbfs:'}{path}"
if not path.startswith(r'/Volumes') and path.startswith(r'/dbfs'):
path = re.sub(r'^(/dbfs)', r'dbfs:', path)
return path
#----------------------------------------------------------------------------------
def get_byte_units(size_bytes):
"""Converts bytes into the largest possible unit of measure.
Parameters
----------
size_bytes : int
Numeric value representing bytes.
Returns
-------
str
String representing the value and the largest unit size.
Example: '200 : GB'
"""
if size_bytes == 0:
return '0 : B'
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1000)))
p = math.pow(1000, i)
s = round(size_bytes / p, 2)
return f'{s} : {size_name[i]}'
# ----------------------------------------------------------------------------------
def get_md5_hash(file_path):
"""Generates an MD5 hash for the contents of a
files contents.
Parameters
----------
file_path : str
Path to the file.
Returns
-------
str
MD5 hash string.
"""
try:
with open(file_path, "rb") as file:
file_hash = hashlib.md5()
while True:
chunk = file.read(8192)
if not chunk:
break
file_hash.update(chunk)
return file_hash.hexdigest()
except FileNotFoundError:
return f"File not found: {file_path}"
except PermissionError:
return f"Permission error reading file: {file_path}"
except Exception as e:
return f"An error occurred: {str(e)}"
# ----------------------------------------------------------------------------------
def get_spreadsheet_metadata(file_path):
"""Returns a dictionary with general metadata for a CSV or Excel file.
Parameters
----------
file_path : str
Path to the CSV or Excel file.
Returns
-------
dict
Dictionary of file metadata.
"""
try:
# Extract filename and extension
filename = os.path.basename(file_path)
base_name, ext = os.path.splitext(filename)
# Get date time file metadata
statinfo = os.stat(file_path)
create_date = datetime.fromtimestamp(statinfo.st_ctime).isoformat()
modified_date = datetime.fromtimestamp(statinfo.st_mtime).isoformat()
# Create dictionary to store the metadata
file_meta = {}
# Read the data into a pandas dataframe by sheet
dfs = read_csv_or_excel_to_df(file_path, infer=True, multi_sheets=True)
file_hash = get_md5_hash(file_path)
for sheet_name, df in dfs.items():
# if 'pyspark.pandas.frame.DataFrame' in str(type(df)):
# df = df.to_pandas()
meta = {
'file_path': file_path,
'file_name': filename,
'file_type': ext,
'file_size_bytes': f'{statinfo.st_size:,}',
'file_size_memory_unit': get_byte_units(int(statinfo.st_size)),
'record_qty': f'{len(df):,}',
'column_qty': f'{len(df.columns):,}',
'file_md5_hash': file_hash,
'created': create_date,
'modified': modified_date
}
# Generate the schema dictionary
file_meta[sheet_name] = meta
return file_meta
except FileNotFoundError:
return f"File not found: {file_path}"
except PermissionError:
return f"Permission error reading file: {file_path}"
except Exception as e:
return f"An error occurred: {str(e)}"
# ----------------------------------------------------------------------------------
def is_numeric_type(value):
"""
Checks if a value is a common numeric data type in
pandas, NumPy, or Python.
Parameters:
----------
value: The value to check.
Returns:
-------
bool: True if the value is numeric, False otherwise.
"""
return isinstance(value, (int, float, complex)) or np.issubdtype(type(value), np.number)
# ----------------------------------------------------------------------------------
def downcast_ints(value):
"""
Downcast a numeric value to an integer if it is equal to
a float representation.
Parameters
----------
value: The numeric value to downcast.
Returns
-------
The value as an integer if it is equal to its float
representation, otherwise the original value.
"""
# Try to downcast a scalar value using pandas' to_numeric with downcast options
try:
# Check if value is numeric
if not pd.api.types.is_number(value):
return value
# Try integer downcast
int_downcast = pd.to_numeric([value], downcast='integer')
if not pd.isnull(int_downcast[0]) and int_downcast[0] == value:
return int_downcast[0]
# Try float downcast
float_downcast = pd.to_numeric([value], downcast='float')
if not pd.isnull(float_downcast[0]) and float_downcast[0] == value:
return float_downcast[0]
except Exception:
pass
return value
# ----------------------------------------------------------------------------------
def get_best_uid_column(df, preferred_column=None):
"""
Identifies the column with the most unique values to serve as a primary key.
This function evaluates columns based on their data type and uniqueness to
select the best candidate for a unique identifier (UID). It prioritizes
columns that are fully unique and of an integer-like type. In case of a tie,
a preferred column is selected. A column name is always returned.
Parameters
----------
df : pandas.DataFrame, pyspark.pandas.DataFrame, or pyspark.sql.DataFrame
The input DataFrame to be analyzed.
preferred_column : str, optional
A column name to be chosen in the event of a tie for the most
unique values. The default is None.
Returns
-------
str
The name of the column identified as the best UID.
If no suitable columns are found, it returns the preferred column
or the first column of the DataFrame as a fallback.
Raises
------
ValueError
If the input DataFrame is empty (has no columns).
"""
# Handle different DataFrame types to work with pyspark.pandas API
df = convert_to_pyspark_pandas(df)
if df.columns.empty:
raise ValueError("DataFrame has no columns to select from.")
# A single pass to classify all columns into their respective tiers
uuid_candidates = []
int_candidates = []
string_candidates = []
float_candidates = []
total_len = len(df)
for col in df.columns:
dtype = str(df[col].dtype)
is_unique = df[col].nunique() == total_len
if dtype in ['string', 'object']:
non_null_values = df[col].dropna()
# Check for UUID-like strings (36-character length)
if not non_null_values.empty and (non_null_values.str.len() == 36).all():
if is_unique:
uuid_candidates.append(col)
elif is_unique:
string_candidates.append(col)
elif dtype.startswith('int') or check_all_int(df[col]) == 'Int64':
if is_unique:
int_candidates.append(col)
elif dtype.startswith('float') or check_all_int(df[col]) == 'Float64':
if is_unique:
float_candidates.append(col)
# First Pass: Find a fully unique column based on the priority order
for candidates in [uuid_candidates, int_candidates, string_candidates, float_candidates]:
if candidates:
# Check for a preferred column tie-breaker within this tier
if preferred_column and preferred_column in candidates:
return preferred_column
return candidates[0]
# Second Pass: If no fully unique column exists, find the most unique one
if preferred_column and preferred_column in df.columns:
return preferred_column
return df.nunique().idxmax()
# ----------------------------------------------------------------------------------
def eval_nested_string_literals(data):
"""
Iterates through a nested dictionary or JSON object, attempting to evaluate
string representations of data types (e.g., lists, dict, tuples) into their
actual Python counterparts. Modifies the structure in-place, replacing string
representations with evaluated values.
Parameters:
-----------
data : dict or str
The nested dictionary to iterate through, or a JSON string to be parsed.
Returns:
--------
dict
The modified dictionary with string representations replaced by evaluated
values.
Raises:
-------
ValueError
If the provided data is not a valid JSON string.
"""
if isinstance(data, str):
try:
data = json.loads(data)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON string: {e}")
for key, value in data.items():
if isinstance(value, dict):
eval_nested_string_literals(value)
else:
try:
value = value.strip('"\'')
except AttributeError:
pass
try:
evaluated_value = ast.literal_eval(value)
except (SyntaxError, ValueError):
evaluated_value = value
if value != evaluated_value:
data[key] = evaluated_value
return data
# ----------------------------------------------------------------------------------
def remove_pd_df_newlines(df, replace_char=''):
"""
Removes newline characters ('\n') from all string
columns in a pandas or pyspark.pandas DataFrame with the given replace
character.
Parameters:
-----------
df : pandas.DataFrame or pyspark.pandas.DataFrame
The DataFrame to process.
replace_char : str, optional
String value to replace newline character with.
Defaults to single space (' ').
Returns:
--------
pandas.DataFrame or pyspark.pandas.DataFrame
The DataFrame with newlines removed from string columns.
"""
# if isinstance(df, ps.DataFrame):
# return df.replace('\n', replace_char, regex=True)
return df.replace('\n', replace_char, regex=True)
# ----------------------------------------------------------------------------------
def conditional_numeric_conversion(
df,
null_values=Config.NA_VALUES,
symbols_to_strip=Config.NUMERIC_SYMBOLS
):
"""
Conditionally converts columns in a DataFrame to a numeric type if a majority of values convert successfully after cleaning symbols.
Parameters
----------
df : pandas.DataFrame
The DataFrame to process.
null_values : list, optional
A list of values to treat as NaN or missing. These values will be replaced with empty strings before conversion.
Defaults to Config.NA_VALUES.
symbols_to_strip : list of str, optional
List of symbol characters to remove from string values before attempting numeric conversion.
Defaults to ['$', '%', ',', '£', '€'].
Returns
-------
pandas.DataFrame
The DataFrame with columns conditionally converted to numeric types where possible.
Notes
-----
- Only columns of object (string) dtype are considered for conversion.
- Null values are replaced with empty strings before symbol removal.
- After symbol removal, empty strings are set to pd.NA.
- If conversion to numeric fails for a column, the original column is retained.
"""
df_out = df.copy()
for col in df_out.columns:
if df_out[col].dtype != 'object':
continue
# Replace all null_values with empty string in the column
cleaned_series = df_out[col].replace(null_values, '', regex=False)
# Remove symbols from non-null values only
cleaned_series = cleaned_series.astype(str)
for symbol in symbols_to_strip:
cleaned_series = cleaned_series.str.replace(
symbol, '', regex=False
)
# Set empty strings back to pd.NA for conversion
cleaned_series = cleaned_series.replace('', pd.NA)
# Only attempt conversion on non-null values
try:
test_conversion = pd.to_numeric(cleaned_series)
df_out[col] = test_conversion
except Exception:
# If conversion fails, skip modifying the column
continue
return df_out
# ----------------------------------------------------------------------------------
def column_is_timestamp(df, column_name, time_format):
"""
Checks if all non-null values in a DataFrame
column can be parsed as time-only given a
specific format.
Parameters:
-----------
df : pandas.DataFrame or pyspark.pandas.DataFrame
The DataFrame containing the column.
column_name : str
The name of the column to check.
time_format : str
The expected string time format.
Returns:
--------
bool
True if all non-null values in the column can
be parsed as time, False otherwise.
"""
if 'pyspark.pandas.frame.DataFrame' in str(type(df)):
df = df.to_pandas()
column_as_str = df[column_name].astype(str).replace(r'^\s+$', pd.NA, regex=True).dropna()
if len(column_as_str) == 0:
return False
def try_parse(time_str):
try:
if not isinstance(time_str, str):
return False
datetime.strptime(time_str, time_format).time()
return True
except ValueError:
return False
return column_as_str.apply(try_parse).all()
# ----------------------------------------------------------------------------------
def is_likely_datetime_col(colname):
"""
Checks if a column name is suggestive of a date/time/timestamp.
Parameters
----------
colname : str
The column name to check.
Returns
-------
bool
True if the column name contains a keyword indicating date/time/timestamp.
"""
colname_lc = str(colname).lower()
return any(keyword in colname_lc for keyword in Config.DATE_COL_KEYWORDS)
# ----------------------------------------------------------------------------------
def infer_datetime_column(df, column_name):
"""
Attempts to convert a DataFrame column to datetime type when appropriate.
For numeric columns (potential Excel serial dates), conversion is only attempted if the column name is suggestive of a date or time field.
For string columns, strict formats defined by Config are always tested; flexible parsing via dateutil is only used if the column name is suggestive.
Parameters
----------
df : pandas.DataFrame or pyspark.pandas.DataFrame
DataFrame containing the column to evaluate for datetime conversion.
column_name : str
The column name to attempt conversion.
Returns
-------
pandas.Series or pyspark.pandas.Series
Converted column (dtype datetime64) if inference was successful; otherwise, the original column.
"""
is_spark_pandas = 'pyspark.pandas.frame.DataFrame' in str(type(df))
series_for_processing = df[column_name].to_pandas() if is_spark_pandas else df[column_name]
orig_series = series_for_processing.copy()
# Return immediately if the column is already of a datetime type
if pd.api.types.is_datetime64_any_dtype(series_for_processing):
return df[column_name] if is_spark_pandas else orig_series
# Handle numeric columns (Excel serial dates) only if the column name is suggestive of a date/time attribute
if pd.api.types.is_numeric_dtype(series_for_processing):
if is_likely_datetime_col(column_name):
non_null_count = series_for_processing.dropna().count()
if non_null_count > 0:
is_plausible_date = (series_for_processing > 1).all() and (series_for_processing < 100000).all()
if is_plausible_date:
try:
converted_series = pd.to_datetime(
series_for_processing,
origin='1899-12-30', unit='D', errors='coerce'
)
successfully_converted_count = converted_series.dropna().count()
if successfully_converted_count / non_null_count >= 0.98:
return ps.Series(converted_series) if is_spark_pandas else converted_series
except Exception:
pass
# If the column name is not suggestive or conversion fails, return the original column
return df[column_name] if is_spark_pandas else orig_series
# For string columns, always attempt conversion using strict formats; use flexible parsing only if the column name is suggestive
elif pd.api.types.is_string_dtype(series_for_processing):
non_null_values = series_for_processing.dropna()
non_null_count = non_null_values.count()
if non_null_count > 0:
# Attempt conversion using predefined strict date formats
for fmt in getattr(Config, "COMMON_DATES", []) + getattr(Config, "COMMON_DATETIMES", []):
try:
converted_series = pd.to_datetime(non_null_values, format=fmt, errors='raise')
successfully_converted_count = converted_series.notnull().sum()
if successfully_converted_count == non_null_count:
combined_series = pd.Series(index=series_for_processing.index, dtype='datetime64[ns]')
combined_series.loc[converted_series.index] = converted_series
return ps.Series(combined_series) if is_spark_pandas else combined_series
except (ValueError, TypeError):
continue
# If strict format conversion fails, apply flexible parsing only for suggestive column names
if is_likely_datetime_col(column_name):
def try_dateutil_parser(x):
try:
# return dt_parser.parse(x)
# yearfirst=False: interpret ambiguous dates like '01/02/2020' as month/day/year (default US style)
# dayfirst=False: do not prioritize day over month in ambiguous dates; month comes first
return dt_parser.parse(x, yearfirst=False, dayfirst=False)
except (ValueError, TypeError):
return None
converted_series = non_null_values.apply(try_dateutil_parser)
combined_series = pd.Series(index=series_for_processing.index, dtype='datetime64[ns]')
combined_series.loc[converted_series.index] = converted_series
successfully_converted_count = combined_series.dropna().count()
if successfully_converted_count / non_null_count >= 0.98:
return ps.Series(combined_series) if is_spark_pandas else combined_series
# If conversion is not successful, return the original column
return df[column_name] if is_spark_pandas else orig_series
# For all other column types, return the original column
return df[column_name] if is_spark_pandas else orig_series
# ----------------------------------------------------------------------------------
def detect_file_encoding(file_path):
"""Detects the character encoding of a text-based file using chardet library.
This function is useful for determining the appropriate encoding when reading
files that may not explicitly declare their encoding. It analyzes a sample
of the file's content to identify the most likely character encoding scheme
used.
Parameters:
----------
file_path (str):
The path to the target file.
Returns:
----------
str:
The detected character encoding of the file. If chardet cannot
determine the encoding with sufficient confidence (less than 50%),
the function returns the pandas default encoding=None or ('utf-8')
as a default fallback.
Raises:
----------
OSError:
If the specified file cannot be opened for reading.
"""
try:
with open(file_path, 'rb') as f:
rawdata = f.read()
except OSError as e:
raise OSError(f"Error opening file: {file_path}. {e}")
result = chardet.detect(rawdata)
if result['confidence'] > 0.5:
return result['encoding']
else:
print(f"Encoding confidence for '{file_path}' is low (< 50%). Using pandas default.")
return None
#----------------------------------------------------------------------------------
def read_spreadsheets(
file_path,
sheet_name=None,
dtype=None,
rm_newlines=True,
replace_char=" ",
na_values=None,
parse_dates=None,
strip_num_symbols=False
):
"""
Reads and processes raw data from Excel (.xlsx, .xls) or CSV (.csv) files into a pandas DataFrame,
with options for cleaning, type inference, and symbol stripping.
Parameters
----------
file_path : str
The path to the data file. Supports local or DBFS paths.
sheet_name : str or int or None, optional
The name or index of the sheet to read from an Excel file. If None, reads the first sheet.
Ignored for CSV files.
dtype : dict or str or None, optional
Data type for data or columns. E.g. {'a': np.float64, 'b': np.int32}. If None, types are inferred.
rm_newlines : bool, optional
If True, removes newline and carriage return characters from all string cells. Default is True.
replace_char : str, optional
The character to replace newline and carriage return characters with. Default is a single space.
na_values : scalar, str, list-like, or dict, optional
Additional strings to recognize as NA/NaN. If dict passed, specific per-column NA values.
By default, pandas recognizes: '', '#N/A', '#N/A N/A', '#NA', '-1.#IND', '-1.#QNAN', '-NaN',
'-nan', '1.#IND', '1.#QNAN', '<NA>', 'N/A', 'NA', 'NULL', 'NaN', 'n/a', 'nan', 'null'.
parse_dates : bool or list of str or list of int or dict or None, optional
Specify columns to parse as dates. See pandas.read_csv/read_excel for details.
strip_num_symbols : bool, optional
If True, attempts to strip common numeric symbols (e.g., $, %, ,) and convert columns to numeric
where possible. Default is True.
Returns
-------
pandas.DataFrame
The DataFrame containing the data from the file, with optional cleaning and type conversion applied.
Raises
------
ValueError
If the file extension is not supported (.xlsx, .xls, or .csv).
Notes
-----
- Supports both Excel and CSV files.
- Optionally cleans newlines and strips numeric symbols for better type inference.
- Column names are stripped of leading/trailing whitespace.
- Uses optimal encoding detection for CSV files.
"""
filename = os.path.basename(file_path)
base_name, ext = os.path.splitext(filename)
file_path = db_path_to_local(file_path)
sheet_name = sheet_name if sheet_name is not None else 0
if ext in [".xlsx", ".xls"]:
df = pd.read_excel(file_path,
sheet_name=sheet_name,
dtype=dtype,
na_values=na_values,
parse_dates=parse_dates)
elif ext == ".csv":
encoding=detect_file_encoding(file_path)
df = pd.read_csv(file_path,
dtype=dtype,
na_values=na_values,
encoding=encoding,
parse_dates=parse_dates)
else:
raise ValueError(f"Unsupported file extension: {ext}")
if rm_newlines:
df = remove_pd_df_newlines(df, replace_char=replace_char)
if strip_num_symbols:
df = conditional_numeric_conversion(df, null_values=na_values)
# Use str.strip() to remove leading and trailing spaces from column names
df.columns = df.columns.str.strip()
# Remove leading and trailing whitespace from the values inside the string/object columns
df = df.apply(lambda col: col.str.strip() if col.dtype == "object" else col)
return df
# ----------------------------------------------------------------------------------
def xlsx_tabs_to_pd_dataframes(file_path,
infer=True,
rm_newlines=True,
replace_char="",