Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve implementation of storage of transpiled files #1409

Open
wants to merge 42 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
7beeaff
Manually merge transpile-using-lsp-saved into transpile-unsing-lsp
ericvergnaud Jan 6, 2025
879407c
fix merge issue
ericvergnaud Jan 6, 2025
b9e663e
move lineage stuff out of LSP api
ericvergnaud Jan 6, 2025
3bc5fb2
refactor TranspileError and TranspileStatus
ericvergnaud Jan 6, 2025
8cb310f
fix refactoring issues
ericvergnaud Jan 6, 2025
8ac675a
drop work in progress
ericvergnaud Dec 19, 2024
819aad5
add range
ericvergnaud Dec 19, 2024
e8b7b3c
fix failing test
ericvergnaud Dec 19, 2024
1c36787
formatting
ericvergnaud Dec 19, 2024
203573e
fix merge issue
ericvergnaud Jan 6, 2025
47e6227
convert diagnostics into TranspileErrors
ericvergnaud Dec 20, 2024
8a58089
display transpile errors in console
ericvergnaud Dec 20, 2024
40b1eff
formatting
ericvergnaud Dec 20, 2024
6f38994
fix failing tests
ericvergnaud Dec 20, 2024
e301186
fix merge issue
ericvergnaud Dec 20, 2024
b02795f
successfully prints transpile errors in the console
ericvergnaud Dec 26, 2024
5ee7117
formatting + failing tests
ericvergnaud Dec 26, 2024
c4836f3
fix failing tests
ericvergnaud Dec 26, 2024
f7ad038
drop no longer relevant tests
ericvergnaud Jan 7, 2025
8803594
add error-file flag to CLI and config
ericvergnaud Dec 27, 2024
ee18e9c
write errors to tmp file
ericvergnaud Dec 27, 2024
b54e627
simplify and format
ericvergnaud Dec 27, 2024
39cf86d
Merge branch 'feature/specify-error-file/configure-transpile-errors-f…
ericvergnaud Jan 7, 2025
561515f
factorize test code
ericvergnaud Dec 27, 2024
0bbfd08
simplify and format
ericvergnaud Dec 27, 2024
ae30790
simplify and format
ericvergnaud Dec 27, 2024
f4eddda
remove duplicate test
ericvergnaud Dec 27, 2024
8010899
fix typo
ericvergnaud Dec 27, 2024
eef26a8
check generated files
ericvergnaud Dec 27, 2024
ea0bf62
refactor
ericvergnaud Dec 27, 2024
94783de
formatting
ericvergnaud Dec 27, 2024
5baeb69
bump max args
ericvergnaud Jan 8, 2025
ea9ff6f
Merge branch 'bump-max-args' into feature/multiplexer/store-transpile…
ericvergnaud Jan 8, 2025
499cc8d
no longer required
ericvergnaud Jan 8, 2025
82a7bcb
add docs
ericvergnaud Jan 9, 2025
5a4ba11
fix typo
ericvergnaud Jan 9, 2025
46a41af
Merge branch 'bump-max-args' into feature/multiplexer/transpile-using…
ericvergnaud Jan 10, 2025
f840530
Merge branch 'feature/multiplexer/transpile-using-lsp' into feature/m…
ericvergnaud Jan 10, 2025
7dabd9a
Merge branch 'feature/multiplexer/refactor-transpile-error' into feat…
ericvergnaud Jan 10, 2025
0170b1f
Merge branch 'feature/multiplexer/display-lsp-diagnostics-in-console'…
ericvergnaud Jan 10, 2025
cda3c47
Merge branch 'main' into feature/multiplexer/store-transpiled-files
ericvergnaud Jan 22, 2025
c44a146
polishing
ericvergnaud Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 31 additions & 40 deletions src/databricks/labs/remorph/transpiler/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
logger = logging.getLogger(__name__)


async def _process_file(
async def _process_one_file(
config: TranspileConfig,
validator: Validator | None,
transpiler: TranspileEngine,
Expand Down Expand Up @@ -71,29 +71,23 @@ async def _process_file(
return transpile_result.success_count, error_list


async def _process_directory(
async def _process_many_files(
config: TranspileConfig,
validator: Validator | None,
transpiler: TranspileEngine,
root: Path,
base_root: Path,
output_folder: Path,
files: list[Path],
) -> tuple[int, list[TranspileError]]:

output_folder = config.output_folder
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this logic upwards

output_folder_base = root / ("transpiled" if output_folder is None else base_root)
make_dir(output_folder_base)

counter = 0
all_errors: list[TranspileError] = []

for file in files:
logger.info(f"Processing file :{file}")
if not is_sql_file(file):
continue

output_file_name = output_folder_base / file.name
success_count, error_list = await _process_file(config, validator, transpiler, file, output_file_name)
output_file_name = output_folder / file.name
success_count, error_list = await _process_one_file(config, validator, transpiler, file, output_file_name)
counter = counter + success_count
all_errors.extend(error_list)

Expand All @@ -104,15 +98,17 @@ async def _process_input_dir(config: TranspileConfig, validator: Validator | Non
error_list = []
file_list = []
counter = 0
input_source = str(config.input_source)
input_path = Path(input_source)
for root, _, files in dir_walk(input_path):
base_root = Path(str(root).replace(input_source, ""))
folder = str(input_path.resolve().joinpath(base_root))
msg = f"Processing for sqls under this folder: {folder}"
logger.info(msg)
input_path = config.input_path
output_folder = config.output_path
if output_folder is None:
output_folder = input_path.parent / "transpiled"
make_dir(output_folder)
for source_dir, _, files in dir_walk(input_path):
relative_path = cast(Path, source_dir).relative_to(input_path)
transpiled_dir = output_folder / relative_path
logger.info(f"Transpiling sql files from folder: {source_dir!s} into {transpiled_dir!s}")
file_list.extend(files)
no_of_sqls, errors = await _process_directory(config, validator, transpiler, root, base_root, files)
no_of_sqls, errors = await _process_many_files(config, validator, transpiler, transpiled_dir, files)
counter = counter + no_of_sqls
error_list.extend(errors)
return TranspileStatus(file_list, counter, error_list)
Expand All @@ -126,23 +122,21 @@ async def _process_input_file(
logger.warning(msg)
# silently ignore non-sql files
return TranspileStatus([], 0, [])
msg = f"Processing sql from this file: {config.input_source}"
msg = f"Transpiling sql file: {config.input_path!s}"
logger.info(msg)
if config.output_path is None:
output_path = config.output_path
if output_path is None:
output_path = config.input_path.parent / "transpiled"
else:
output_path = config.output_path

make_dir(output_path)
output_file = output_path / config.input_path.name
no_of_sqls, error_list = await _process_file(config, validator, transpiler, config.input_path, output_file)
no_of_sqls, error_list = await _process_one_file(config, validator, transpiler, config.input_path, output_file)
return TranspileStatus([config.input_path], no_of_sqls, error_list)


@timeit
async def transpile(
workspace_client: WorkspaceClient, engine: TranspileEngine, config: TranspileConfig
) -> tuple[list[dict[str, Any]], list[TranspileError]]:
) -> tuple[dict[str, Any], list[TranspileError]]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return a dict, we never have less or more than 1 status item

await engine.initialize(config)
status, errors = await _do_transpile(workspace_client, engine, config)
await engine.shutdown()
Expand All @@ -151,7 +145,7 @@ async def transpile(

async def _do_transpile(
workspace_client: WorkspaceClient, engine: TranspileEngine, config: TranspileConfig
) -> tuple[list[dict[str, Any]], list[TranspileError]]:
) -> tuple[dict[str, Any], list[TranspileError]]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return a dict, we never have less or more than 1 status item

"""
[Experimental] Transpiles the SQL queries from one dialect to another.

Expand All @@ -163,8 +157,6 @@ async def _do_transpile(
logger.error("Input SQL path is not provided.")
raise ValueError("Input SQL path is not provided.")

status = []

validator = None
if not config.skip_validation:
sql_backend = db_sql.get_sql_backend(workspace_client)
Expand Down Expand Up @@ -194,17 +186,16 @@ async def _do_transpile(
with cast(Path, error_log_path).open("a", encoding="utf-8") as e:
e.writelines(f"{err!s}\n" for err in result.error_list)

status.append(
{
"total_files_processed": len(result.file_list),
"total_queries_processed": result.no_of_transpiled_queries,
"no_of_sql_failed_while_analysing": result.analysis_error_count,
"no_of_sql_failed_while_parsing": result.parsing_error_count,
"no_of_sql_failed_while_generating": result.generation_error_count,
"no_of_sql_failed_while_validating": result.validation_error_count,
"error_log_file": str(error_log_path),
}
)
status = {
"total_files_processed": len(result.file_list),
"total_queries_processed": result.no_of_transpiled_queries,
"analysis_error_count": result.analysis_error_count,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

align names

"parsing_error_count": result.parsing_error_count,
"validation_error_count": result.validation_error_count,
"generation_error_count": result.generation_error_count,
"error_log_file": str(error_log_path),
}

return status, result.error_list


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async def transpile(
read_dialect = get_dialect(source_dialect)
error: TranspileError | None = self._check_supported(read_dialect, source_code, file_path)
if error:
return TranspileResult(str(file_path), 1, [error])
return TranspileResult(source_code, 1, [error])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix silent bug

write_dialect = get_dialect(target_dialect)
try:
transpiled_expressions = transpile(
Expand Down
142 changes: 142 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import re
import shutil
from pathlib import Path
from collections.abc import Sequence
from unittest.mock import create_autospec
Expand All @@ -15,6 +16,7 @@
from databricks.sdk.errors import NotFound

from databricks.labs.remorph.config import TranspileConfig
from databricks.labs.remorph.helpers.file_utils import make_dir
from databricks.labs.remorph.transpiler.sqlglot.dialect_utils import SQLGLOT_DIALECTS
from databricks.labs.remorph.transpiler.sqlglot.generator.databricks import Databricks
from databricks.labs.remorph.transpiler.sqlglot.parsers.snowflake import Snowflake
Expand Down Expand Up @@ -270,3 +272,143 @@ def download(path: str) -> io.StringIO | io.BytesIO:
config.cluster_id = None
workspace_client.config = config
return workspace_client


def safe_remove_dir(dir_path: Path):
if dir_path.exists():
shutil.rmtree(dir_path)


def safe_remove_file(file_path: Path):
if file_path.exists():
file_path.unlink()


def write_data_to_file(path: Path, content: str):
with path.open("w") as writable:
# added encoding to avoid UnicodeEncodeError while writing to file for token error test
writable.write(content.encode("utf-8", "ignore").decode("utf-8"))


@pytest.fixture
def input_source(tmp_path: Path):
input_dir = tmp_path / "remorph_source"
query_1_sql_file = input_dir / "query1.sql"
query_2_sql_file = input_dir / "query2.sql"
query_3_sql_file = input_dir / "query3.sql"
query_4_sql_file = input_dir / "query4.sql"
query_5_sql_file = input_dir / "query5.sql"
stream_1_sql_file = input_dir / "stream1.sql"
call_center_ddl_file = input_dir / "call_center.ddl"
file_text = input_dir / "file.txt"
safe_remove_dir(input_dir)
make_dir(input_dir)

query_1_sql = """select i_manufact, sum(ss_ext_sales_price) ext_price from date_dim, store_sales where
d_date_sk = ss_sold_date_sk and substr(ca_zip,1,5) <> substr(s_zip,1,5) group by i_manufact order by i_manufact
limit 100 ;"""

call_center_ddl = """create table call_center
(
cc_call_center_sk int ,
cc_call_center_id varchar(16)
)

CLUSTER BY(cc_call_center_sk)
"""

query_2_sql = """select wswscs.d_week_seq d_week_seq1,sun_sales sun_sales1,mon_sales mon_sales1 from wswscs,
date_dim where date_dim.d_week_seq = wswscs.d_week_seq and d_year = 2001"""

query_3_sql = """with wscs as
(select sold_date_sk
,sales_price
from (select ws_sold_date_sk sold_date_sk
,ws_ext_sales_price sales_price
from web_sales
union all
select cs_sold_date_sk sold_date_sk
,cs_ext_sales_price sales_price
from catalog_sales)),
wswscs as
(select d_week_seq,
sum(case when (d_day_name='Sunday') then sales_price else null end) sun_sales,
sum(case when (d_day_name='Monday') then sales_price else null end) mon_sales,
sum(case when (d_day_name='Tuesday') then sales_price else null end) tue_sales,
sum(case when (d_day_name='Wednesday') then sales_price else null end) wed_sales,
sum(case when (d_day_name='Thursday') then sales_price else null end) thu_sales,
sum(case when (d_day_name='Friday') then sales_price else null end) fri_sales,
sum(case when (d_day_name='Saturday') then sales_price else null end) sat_sales
from wscs
,date_dim
where d_date_sk = sold_date_sk
group by d_week_seq)
select d_week_seq1
,round(sun_sales1/sun_sales2,2)
,round(mon_sales1/mon_sales2,2)
,round(tue_sales1/tue_sales2,2)
,round(wed_sales1/wed_sales2,2)
,round(thu_sales1/thu_sales2,2)
,round(fri_sales1/fri_sales2,2)
,round(sat_sales1/sat_sales2,2)
from
(select wswscs.d_week_seq d_week_seq1
,sun_sales sun_sales1
,mon_sales mon_sales1
,tue_sales tue_sales1
,wed_sales wed_sales1
,thu_sales thu_sales1
,fri_sales fri_sales1
,sat_sales sat_sales1
from wswscs,date_dim
where date_dim.d_week_seq = wswscs.d_week_seq and
d_year = 2001) y,
(select wswscs.d_week_seq d_week_seq2
,sun_sales sun_sales2
,mon_sales mon_sales2
,tue_sales tue_sales2
,wed_sales wed_sales2
,thu_sales thu_sales2
,fri_sales fri_sales2
,sat_sales sat_sales2
from wswscs
,date_dim
where date_dim.d_week_seq = wswscs.d_week_seq2 and
d_year = 2001+1) z
where d_week_seq1=d_week_seq2-53
order by d_week_seq1;
"""

stream_1_sql = """CREATE STREAM unsupported_stream AS SELECT * FROM some_table;"""

query_4_sql = """create table(
col1 int
col2 string
);"""

query_5_sql = """1SELECT * from ~v\ud83d' table;"""

write_data_to_file(query_1_sql_file, query_1_sql)
write_data_to_file(call_center_ddl_file, call_center_ddl)
write_data_to_file(query_2_sql_file, query_2_sql)
write_data_to_file(query_3_sql_file, query_3_sql)
write_data_to_file(query_4_sql_file, query_4_sql)
write_data_to_file(query_5_sql_file, query_5_sql)
write_data_to_file(stream_1_sql_file, stream_1_sql)
write_data_to_file(file_text, "This is a test file")
yield input_dir
safe_remove_dir(input_dir)


@pytest.fixture
def output_folder(tmp_path: Path):
output_dir = tmp_path / "remorph_transpiled"
yield output_dir
safe_remove_dir(output_dir)


@pytest.fixture
def error_file(tmp_path: Path):
file_path = tmp_path / "transpile_errors.lst"
yield file_path
safe_remove_file(file_path)
Loading
Loading