Skip to content

Commit 577422d

Browse files
committed
#424 Fix ASCII reader corner case
1 parent dd52448 commit 577422d

File tree

2 files changed

+70
-9
lines changed

2 files changed

+70
-9
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextRecordExtractor.scala

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ import java.util
3030
* Hopefully, comments will help anyone reading this.
3131
*/
3232
class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
33+
private val recordSize = ctx.copybook.getRecordSize
34+
3335
// Maximum possible record size is the size of the copybook record + maximum size of the delimiter (2 characters for CRLF).
34-
private val maxRecordSize = ctx.copybook.getRecordSize + 2
36+
private val maxRecordSize = recordSize + 2
3537

3638
// This is the buffer to keep the part of the stream that will be split by records.
3739
// The size of the array is always the maximum record size. The number of bytes that contain useful payload is specified
@@ -45,8 +47,6 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe
4547
private var curRecordSize = 0
4648
// The number of bytes from pendingBytes that correspond to a record, without line break character(s)
4749
private var curPayloadSize = 0
48-
// The number of bytes the line breaking character has taken for the last record. Can only be 1 (LF) or 2 (CR LF).
49-
private var lastLineBreakSize = 1
5050

5151
override def hasNext: Boolean = {
5252
if (!isRawRecordFound) {
@@ -126,21 +126,19 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe
126126
} else {
127127
// Last record or a record is too large?
128128
// In the latter case
129-
if (ctx.inputStream.isEndOfStream) {
129+
if (pendingBytesSize <= recordSize && ctx.inputStream.isEndOfStream) {
130130
// Last record
131131
curRecordSize = pendingBytesSize
132132
curPayloadSize = pendingBytesSize
133133
} else {
134134
// This is an errors situation - no line breaks between records
135-
// Return a record worth of data minus line break.
136-
curRecordSize = pendingBytesSize - lastLineBreakSize
137-
curPayloadSize = pendingBytesSize - lastLineBreakSize
135+
// Return a record worth of data.
136+
curRecordSize = recordSize
137+
curPayloadSize = recordSize
138138
}
139139
}
140140

141141
isRawRecordFound = true
142-
143-
lastLineBreakSize = recordLength - recordPayload
144142
}
145143

146144
// This method extracts the current record from the buffer array.

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test13AsciiCrLfText.scala

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,5 +126,68 @@ class Test13AsciiCrLfText extends WordSpec with SparkTestBase with BinaryFileFix
126126
assert(count == 0)
127127
}
128128
}
129+
130+
"correctly read text files without EOL characters" in {
131+
val text = "AABBCC"
132+
withTempBinFile("crlf_empty", ".dat", text.getBytes()) { tmpFileName =>
133+
val df = spark
134+
.read
135+
.format("cobol")
136+
.option("copybook_contents", copybook)
137+
.option("pedantic", "true")
138+
.option("record_format", "D")
139+
.load(tmpFileName)
140+
141+
val expected = """[{"A":"AA"},{"A":"BB"},{"A":"CC"}]"""
142+
143+
val count = df.count()
144+
val actual = df.toJSON.collect().mkString("[", ",", "]")
145+
146+
assert(count == 3)
147+
assertEqualsMultiline(actual, expected)
148+
}
149+
}
150+
}
151+
152+
"correctly read text files with a single EOL characters" in {
153+
val text = "AA\nBBCC"
154+
withTempBinFile("crlf_empty", ".dat", text.getBytes()) { tmpFileName =>
155+
val df = spark
156+
.read
157+
.format("cobol")
158+
.option("copybook_contents", copybook)
159+
.option("pedantic", "true")
160+
.option("record_format", "D")
161+
.load(tmpFileName)
162+
163+
val expected = """[{"A":"AA"},{"A":"BB"},{"A":"CC"}]"""
164+
165+
val count = df.count()
166+
val actual = df.toJSON.collect().mkString("[", ",", "]")
167+
168+
assert(count == 3)
169+
assertEqualsMultiline(actual, expected)
170+
}
171+
}
172+
173+
"correctly read text files with a double EOL characters" in {
174+
val text = "AA\r\nBBCC"
175+
withTempBinFile("crlf_empty", ".dat", text.getBytes()) { tmpFileName =>
176+
val df = spark
177+
.read
178+
.format("cobol")
179+
.option("copybook_contents", copybook)
180+
.option("pedantic", "true")
181+
.option("record_format", "D")
182+
.load(tmpFileName)
183+
184+
val expected = """[{"A":"AA"},{"A":"BB"},{"A":"CC"}]"""
185+
186+
val count = df.count()
187+
val actual = df.toJSON.collect().mkString("[", ",", "]")
188+
189+
assert(count == 3)
190+
assertEqualsMultiline(actual, expected)
191+
}
129192
}
130193
}

0 commit comments

Comments
 (0)